ETL Testing

loom.etl.testing provides in-memory harnesses for testing ETLStep subclasses without spinning up a real catalog or writing to Delta Lake.

Test a single step with Polars

PolarsStepRunner seeds source tables as plain Python tuples, runs one step, and returns a StepResult for assertions.

from loom.etl.testing import PolarsStepRunner

def test_clean_orders():
    runner = PolarsStepRunner()
    runner.seed("raw.orders", [(1, 10.0), (2, -5.0)], ["id", "amount"])
    result = runner.run(CleanOrdersStep, NoParams())

    result.assert_count(1)
    result.assert_schema({"id": "Int64", "amount": "Float64"})

StepResult methods: assert_schema, assert_count, assert_not_empty, show, to_polars.

Reusable seed datasets with ETLScenario

ETLScenario stores seed data as plain tuples so the same dataset can be used with any backend runner.

from loom.etl.testing import ETLScenario

ORDERS = (
    ETLScenario()
    .with_table("raw.orders", [(1, 10.0), (2, 20.0)], ["id", "amount"])
)

def test_double_amount(loom_polars_runner):
    ORDERS.apply(loom_polars_runner)
    result = loom_polars_runner.run(DoubleAmountStep, NoParams())
    result.assert_count(2)

Pytest fixtures auto-registered: loom_polars_runner, loom_spark_runner.

Spark integration tests

Use SparkTestSession for a real PySpark + Delta session:

from loom.etl.testing.spark import SparkTestSession

with SparkTestSession.start() as spark:
    runner = SparkStepRunner(spark)
    runner.seed("raw.orders", [(1, 10.0)], ["id", "amount"])
    result = runner.run(CleanOrdersStep, NoParams())
    result.assert_count(1)

Compile-time stubs

  • StubCatalog — in-memory catalog.

  • StubSourceReader / StubTargetWriter — in-memory I/O.

  • StubRunObserver — records pipeline lifecycle events.