loom.etl.testing

ETL testing utilities — stubs, runners, scenario, and pytest fixtures.

Public API

Pytest fixtures (auto-registered)

Spark

Import loom.etl.testing.spark explicitly to access SparkStepRunner and SparkTestSession. That module is the only place where PySpark is imported at module level — this file has zero PySpark dependency.

Functions

loom_polars_runner()

Fresh PolarsStepRunner per test — no Delta I/O, in-memory only.

class loom.etl.testing.StubCatalog(tables=None, schemas=None)[source]

Bases: object

In-memory TableDiscovery for tests.

Accepts two optional seed dictionaries:

  • tables — column name tuples (for quick existence + column checks)

  • schemas — full ColumnSchema tuples

    (for type-aware writer tests)

When schemas is provided, columns and exists are also derived from it — no need to populate both. update_schema persists changes in memory so later steps in the same test see the evolved schema.

Parameters:
  • tables (dict[str, tuple[str, ...]] | None) – Mapping of "schema.table"("col1", "col2", ...).

  • schemas (dict[str, tuple[ColumnSchema, ...]] | None) – Mapping of "schema.table"tuple[ColumnSchema, ...].

Example:

catalog = StubCatalog(
    schemas={
        "raw.orders": (
            ColumnSchema("id",     LoomDtype.INT64,   nullable=False),
            ColumnSchema("amount", LoomDtype.FLOAT64),
            ColumnSchema("year",   LoomDtype.INT32,   nullable=False),
        ),
        "staging.out": (),
    }
)
assert catalog.exists(TableRef("raw.orders"))
assert catalog.schema(TableRef("raw.orders"))[0].name == "id"
exists(ref)[source]

Return True if the ref is registered in this stub catalog.

Parameters:

ref (TableRef)

Return type:

bool

columns(ref)[source]

Return registered column names, or () for unknown tables.

Parameters:

ref (TableRef)

Return type:

tuple[str, …]

schema(ref)[source]

Return the full schema, or None if the table is not yet registered.

Parameters:

ref (TableRef)

Return type:

tuple[ColumnSchema, …] | None

update_schema(ref, schema)[source]

Persist a schema update in memory.

Parameters:
Return type:

None

class loom.etl.testing.StubSourceReader(frames=None)[source]

Bases: object

In-memory SourceReader for tests.

Returns pre-seeded frames keyed by source alias. Any alias not found returns None — tests should seed all expected aliases.

Parameters:

frames (dict[str, Any] | None) – Mapping of source alias → frame object.

Example:

reader = StubSourceReader({"orders": pl.LazyFrame({"id": [1, 2]})})
frame = reader.read(orders_spec, params)
assert frame is not None
read(spec, _params_instance)[source]

Return the pre-seeded frame for spec.alias, or None.

Parameters:
Return type:

Any

execute_sql(frames, query, /)[source]

SQL execution is not supported by the generic in-memory stub.

Parameters:
Return type:

Any

class loom.etl.testing.StubTargetWriter[source]

Bases: object

In-memory TargetWriter for tests.

Captures all write calls in written for post-execution assertion.

written

List of (frame, spec) pairs in write order.

streaming_flags

Streaming flag values received for each write call.

Example:

writer = StubTargetWriter()
writer.write(frame, spec, params)
assert len(writer.written) == 1
frame_out, spec_out = writer.written[0]
assert isinstance(spec_out, ReplaceSpec)
write(frame, spec, _params_instance, *, streaming=False)[source]

Capture the (frame, spec) pair for later assertion.

Parameters:
  • frame (Any)

  • spec (AppendSpec | ReplaceSpec | ReplacePartitionsSpec | ReplaceWhereSpec | UpsertSpec | HistorifySpec | FileSpec | TempSpec | TempFanInSpec)

  • _params_instance (Any)

  • streaming (bool)

Return type:

None

class loom.etl.testing.StubRunObserver[source]

Bases: object

In-memory ETLRunObserver for tests.

Captures all lifecycle events as a flat list of (event_name, data) tuples. Helper properties provide quick access to common assertions.

events

All events in order — (event_name, data_dict) tuples.

Example:

observer = StubRunObserver()
executor = ETLExecutor(reader, writer, observers=[observer])
executor.run_step(plan, params)

assert observer.step_statuses == ["success"]
assert "step_start" in observer.event_names
property event_names: list[EventName]

Ordered list of event names.

property step_statuses: list[RunStatus]

Statuses from all step_end events in order.

property pipeline_statuses: list[RunStatus]

Statuses from all pipeline_end events in order.

class loom.etl.testing.PolarsStepRunner[source]

Bases: object

In-memory test harness for Polars ETLStep subclasses.

Seeds are plain Python tuples — no Polars dependency at definition time. Internally, each seed is converted to a polars.LazyFrame so execute() receives the same type as in production.

No Delta I/O — reads and writes are captured in memory.

Example:

def test_double_amount(loom_polars_runner):
    loom_polars_runner.seed("raw.orders", [(1, 10.0), (2, 20.0)], ["id", "amount"])
    result = loom_polars_runner.run(DoubleAmountStep, NoParams())
    result.assert_count(2)
    result.assert_schema({"amount": LoomDtype.FLOAT64})
    result.show()
seed(ref, data, columns)[source]

Register raw data under the logical table reference ref.

Parameters:
  • ref (str) – Logical table reference, e.g. "raw.orders".

  • data (list[tuple[Any, ...]]) – Row data as a list of tuples.

  • columns (list[str]) – Column names aligned with the tuple positions.

Returns:

self for fluent chaining.

Return type:

PolarsStepRunner

run(step_cls, params)[source]

Compile and execute step_cls against the seeded tables.

Parameters:
  • step_cls (type[Any]) – ETLStep subclass to execute.

  • params (Any) – Concrete params instance for this run.

Returns:

StepResult for assertions.

Raises:
  • KeyError – When a source table was not seeded.

  • RuntimeError – When the step produced no output.

Return type:

StepResult

property target_spec: AppendSpec | ReplaceSpec | ReplacePartitionsSpec | ReplaceWhereSpec | UpsertSpec | HistorifySpec | FileSpec | TempSpec | TempFanInSpec

Target spec from the last run() call.

Raises:

RuntimeError – When run() has not been called yet.

property resolved_predicate: str | None

SQL predicate resolved from the last run’s target spec and params.

Returns None when the write mode has no predicate.

class loom.etl.testing.StepResult(_frame)[source]

Bases: object

Output of a PolarsStepRunner or SparkStepRunner run.

Provides a backend-agnostic assertion surface. The internal frame is always a materialised polars.DataFrame — Spark results are converted before constructing this object.

Parameters:

_frame (polars.DataFrame) – Materialised output frame.

Example:

result = runner.run(MyStep, params)
result.assert_count(10)
result.assert_schema({"id": LoomDtype.INT64, "amount": LoomDtype.FLOAT64})
result.show()
to_polars()[source]

Return the materialised output as a polars.DataFrame.

Return type:

polars.DataFrame

assert_schema(expected)[source]

Assert that the output columns match the expected Loom schema.

Parameters:

expected (dict[str, LoomDtype]) – Mapping of column name → LoomDtype.

Raises:

AssertionError – When a column is missing or has a different dtype.

Return type:

None

assert_count(n)[source]

Assert the result contains exactly n rows.

Parameters:

n (int) – Expected row count.

Raises:

AssertionError – When the row count differs.

Return type:

None

assert_not_empty()[source]

Assert the result contains at least one row.

Raises:

AssertionError – When the result is empty.

Return type:

None

show(n=10)[source]

Print the first n rows to stdout.

Useful for visual inspection during pytest -s runs.

Parameters:

n (int) – Number of rows to display. Defaults to 10.

Return type:

None

class loom.etl.testing.ETLScenario[source]

Bases: object

Named, reusable seed dataset for step runner tests.

Stores input data as plain Python tuples — no backend dependency at definition time. Data is passed to the runner’s seed() when apply() is called, so any StepRunnerProto-compatible runner works.

Example:

ORDERS = (
    ETLScenario()
    .with_table("raw.orders", [(1, 10.0), (2, 20.0)], ["id", "amount"])
)

def test_double_amount(loom_polars_runner):
    ORDERS.apply(loom_polars_runner)
    result = loom_polars_runner.run(DoubleAmountStep, NoParams())
    result.assert_count(2)
with_table(ref, data, columns)[source]

Add a table seed to this scenario.

Parameters:
  • ref (str) – Logical table reference, e.g. "raw.orders".

  • data (list[tuple[Any, ...]]) – Row data as a list of tuples.

  • columns (list[str]) – Column names aligned with the tuple positions.

Returns:

self for fluent chaining.

Return type:

ETLScenario

apply(runner)[source]

Seed all tables into runner and return it.

Parameters:

runner (StepRunnerProto) – Any StepRunnerProto-compatible runner.

Returns:

The same runner for fluent chaining.

Return type:

StepRunnerProto

class loom.etl.testing.StepRunnerProto(*args, **kwargs)[source]

Bases: Protocol

Protocol satisfied by both PolarsStepRunner and SparkStepRunner.

Any object implementing seed() and run() is compatible with ETLScenario.

seed(ref, data, columns)[source]

Register raw data under the logical table reference ref.

Parameters:
Return type:

Any