loom.etl.testing¶
ETL testing utilities — stubs, runners, scenario, and pytest fixtures.
Public API¶
StubCatalog— in-memory catalog for compile-time tests.StubSourceReader— in-memory reader, keyed by table reference.StubTargetWriter— in-memory writer that captures written frames.StubRunObserver— in-memory observer that records lifecycle events.PolarsStepRunner— Polars-backed in-memory step harness.StepResult— unified assertion surface for step output.ETLScenario— backend-agnostic reusable seed dataset.StepRunnerProto— protocol satisfied by both runner types.
Pytest fixtures (auto-registered)¶
loom_polars_runner— freshPolarsStepRunnerper test.loom_spark_runner— freshSparkStepRunnerper test (requiresloom_spark_session, registered inloom.etl.testing.spark).
Spark¶
Import loom.etl.testing.spark explicitly to access
SparkStepRunner and
SparkTestSession. That module is the only
place where PySpark is imported at module level — this file has zero PySpark
dependency.
Functions
|
Fresh |
- class loom.etl.testing.StubCatalog(tables=None, schemas=None)[source]¶
Bases:
objectIn-memory
TableDiscoveryfor tests.Accepts two optional seed dictionaries:
tables— column name tuples (for quick existence + column checks)schemas— fullColumnSchematuples(for type-aware writer tests)
When
schemasis provided,columnsandexistsare also derived from it — no need to populate both.update_schemapersists changes in memory so later steps in the same test see the evolved schema.- Parameters:
Example:
catalog = StubCatalog( schemas={ "raw.orders": ( ColumnSchema("id", LoomDtype.INT64, nullable=False), ColumnSchema("amount", LoomDtype.FLOAT64), ColumnSchema("year", LoomDtype.INT32, nullable=False), ), "staging.out": (), } ) assert catalog.exists(TableRef("raw.orders")) assert catalog.schema(TableRef("raw.orders"))[0].name == "id"
- schema(ref)[source]¶
Return the full schema, or
Noneif the table is not yet registered.- Parameters:
ref (TableRef)
- Return type:
tuple[ColumnSchema, …] | None
- update_schema(ref, schema)[source]¶
Persist a schema update in memory.
- Parameters:
ref (TableRef)
schema (tuple[ColumnSchema, ...])
- Return type:
None
- class loom.etl.testing.StubSourceReader(frames=None)[source]¶
Bases:
objectIn-memory
SourceReaderfor tests.Returns pre-seeded frames keyed by source alias. Any alias not found returns
None— tests should seed all expected aliases.Example:
reader = StubSourceReader({"orders": pl.LazyFrame({"id": [1, 2]})}) frame = reader.read(orders_spec, params) assert frame is not None
- read(spec, _params_instance)[source]¶
Return the pre-seeded frame for
spec.alias, orNone.- Parameters:
spec (TableSourceSpec | FileSourceSpec | TempSourceSpec)
_params_instance (Any)
- Return type:
- class loom.etl.testing.StubTargetWriter[source]¶
Bases:
objectIn-memory
TargetWriterfor tests.Captures all write calls in
writtenfor post-execution assertion.- written¶
List of
(frame, spec)pairs in write order.
- streaming_flags¶
Streaming flag values received for each write call.
Example:
writer = StubTargetWriter() writer.write(frame, spec, params) assert len(writer.written) == 1 frame_out, spec_out = writer.written[0] assert isinstance(spec_out, ReplaceSpec)
- write(frame, spec, _params_instance, *, streaming=False)[source]¶
Capture the
(frame, spec)pair for later assertion.- Parameters:
frame (Any)
spec (AppendSpec | ReplaceSpec | ReplacePartitionsSpec | ReplaceWhereSpec | UpsertSpec | HistorifySpec | FileSpec | TempSpec | TempFanInSpec)
_params_instance (Any)
streaming (bool)
- Return type:
None
- class loom.etl.testing.StubRunObserver[source]¶
Bases:
objectIn-memory
ETLRunObserverfor tests.Captures all lifecycle events as a flat list of
(event_name, data)tuples. Helper properties provide quick access to common assertions.- events¶
All events in order —
(event_name, data_dict)tuples.
Example:
observer = StubRunObserver() executor = ETLExecutor(reader, writer, observers=[observer]) executor.run_step(plan, params) assert observer.step_statuses == ["success"] assert "step_start" in observer.event_names
- class loom.etl.testing.PolarsStepRunner[source]¶
Bases:
objectIn-memory test harness for Polars
ETLStepsubclasses.Seeds are plain Python tuples — no Polars dependency at definition time. Internally, each seed is converted to a
polars.LazyFramesoexecute()receives the same type as in production.No Delta I/O — reads and writes are captured in memory.
Example:
def test_double_amount(loom_polars_runner): loom_polars_runner.seed("raw.orders", [(1, 10.0), (2, 20.0)], ["id", "amount"]) result = loom_polars_runner.run(DoubleAmountStep, NoParams()) result.assert_count(2) result.assert_schema({"amount": LoomDtype.FLOAT64}) result.show()
- run(step_cls, params)[source]¶
Compile and execute step_cls against the seeded tables.
- Parameters:
- Returns:
StepResultfor assertions.- Raises:
KeyError – When a source table was not seeded.
RuntimeError – When the step produced no output.
- Return type:
- property target_spec: AppendSpec | ReplaceSpec | ReplacePartitionsSpec | ReplaceWhereSpec | UpsertSpec | HistorifySpec | FileSpec | TempSpec | TempFanInSpec¶
Target spec from the last
run()call.- Raises:
RuntimeError – When
run()has not been called yet.
- class loom.etl.testing.StepResult(_frame)[source]¶
Bases:
objectOutput of a
PolarsStepRunnerorSparkStepRunnerrun.Provides a backend-agnostic assertion surface. The internal frame is always a materialised
polars.DataFrame— Spark results are converted before constructing this object.- Parameters:
_frame (polars.DataFrame) – Materialised output frame.
Example:
result = runner.run(MyStep, params) result.assert_count(10) result.assert_schema({"id": LoomDtype.INT64, "amount": LoomDtype.FLOAT64}) result.show()
- to_polars()[source]¶
Return the materialised output as a
polars.DataFrame.- Return type:
polars.DataFrame
- assert_schema(expected)[source]¶
Assert that the output columns match the expected Loom schema.
- Parameters:
expected (dict[str, LoomDtype]) – Mapping of column name →
LoomDtype.- Raises:
AssertionError – When a column is missing or has a different dtype.
- Return type:
None
- assert_count(n)[source]¶
Assert the result contains exactly n rows.
- Parameters:
n (int) – Expected row count.
- Raises:
AssertionError – When the row count differs.
- Return type:
None
- assert_not_empty()[source]¶
Assert the result contains at least one row.
- Raises:
AssertionError – When the result is empty.
- Return type:
None
- class loom.etl.testing.ETLScenario[source]¶
Bases:
objectNamed, reusable seed dataset for step runner tests.
Stores input data as plain Python tuples — no backend dependency at definition time. Data is passed to the runner’s
seed()whenapply()is called, so anyStepRunnerProto-compatible runner works.Example:
ORDERS = ( ETLScenario() .with_table("raw.orders", [(1, 10.0), (2, 20.0)], ["id", "amount"]) ) def test_double_amount(loom_polars_runner): ORDERS.apply(loom_polars_runner) result = loom_polars_runner.run(DoubleAmountStep, NoParams()) result.assert_count(2)
- apply(runner)[source]¶
Seed all tables into runner and return it.
- Parameters:
runner (StepRunnerProto) – Any
StepRunnerProto-compatible runner.- Returns:
The same runner for fluent chaining.
- Return type:
- class loom.etl.testing.StepRunnerProto(*args, **kwargs)[source]¶
Bases:
ProtocolProtocol satisfied by both
PolarsStepRunnerandSparkStepRunner.Any object implementing
seed()andrun()is compatible withETLScenario.