loom.etl.testing.spark

Spark testing utilities and pytest fixtures for the Loom ETL framework.

Provides SparkTestSession — a context manager that handles Spark configuration, Delta extension setup, and teardown.

The loom_spark_session pytest fixture is registered automatically via the pytest11 entry point when loom-kernel[etl-spark] is installed — no explicit import or pytest_plugins declaration is needed.

Usage in conftest.py:

import pytest
from loom.etl.testing.spark import SparkTestSession, SparkStepRunner

@pytest.fixture(scope="session")
def spark():
    with SparkTestSession.start(app="my-tests", parallelism=1) as session:
        yield session

@pytest.fixture
def step_runner(spark):
    return SparkStepRunner(spark)

Functions

_delta_spark_version()

_pick_jar(jar_dir, prefix, expected_version)

_resolve_ivy_dir(ivy_dir)

_resolve_local_delta_jars()

_sandbox_network_disabled()

_spark_frame_to_polars(frame)

loom_spark_runner(loom_spark_session)

Fresh SparkStepRunner per test — no Delta I/O, in-memory only.

loom_spark_session()

Local SparkSession with Delta Lake extensions.

Classes

SparkStepRunner(spark)

In-memory test harness for Spark ETLStep subclasses.

SparkTestSession(session)

Context manager for a local PySpark + Delta test session.

_SparkCapturingWriter()

_SparkStubReader(frames)

class loom.etl.testing.spark.SparkTestSession(session)[source]

Bases: object

Context manager for a local PySpark + Delta test session.

Handles Delta extension configuration and clean teardown.

Use as a context manager to guarantee spark.stop() is always called:

with SparkTestSession.start(app="etl-tests") as spark:
    df = spark.createDataFrame([(1, "a")], ["id", "label"])
Parameters:

session (SparkSession) – Active pyspark.sql.SparkSession.

classmethod start(*, app='loom-etl-test', parallelism=1, memory='1g', ivy_dir=None)[source]

Create and return a configured local SparkSession.

Parameters:
  • app (str) – Spark application name shown in the UI.

  • parallelism (int) – spark.sql.shuffle.partitions — keep low (1–2) for unit tests to reduce overhead.

  • memory (str) – Driver heap size string (e.g. "1g").

  • ivy_dir (str | Path | None) – Optional Ivy cache directory for Maven package resolution. When omitted in a constrained sandbox, a writable temp directory is selected automatically.

Returns:

A SparkTestSession wrapping the active session.

Return type:

SparkTestSession

property session: pyspark.sql.SparkSession

The underlying pyspark.sql.SparkSession.

class loom.etl.testing.spark.SparkStepRunner(spark)[source]

Bases: object

In-memory test harness for Spark ETLStep subclasses.

Seeds are plain Python tuples — no Spark dependency at definition time. Internally, each seed is converted to a pyspark.sql.DataFrame so execute() receives the same type as in production.

No Delta I/O — reads and writes are captured in memory.

Example:

def test_aggregate(loom_spark_runner):
    loom_spark_runner.seed("raw.orders", [(1, 10.0)], ["id", "amount"])
    result = loom_spark_runner.run(AggregateStep, NoParams())
    result.assert_count(1)
    result.show()
Parameters:

spark (SparkSession)

seed(ref, data, columns)[source]

Register raw data under the logical table reference ref.

Parameters:
  • ref (str) – Logical table reference, e.g. "raw.orders".

  • data (list[tuple[Any, ...]]) – Row data as a list of tuples.

  • columns (list[str]) – Column names aligned with the tuple positions.

Returns:

self for fluent chaining.

Return type:

SparkStepRunner

run(step_cls, params)[source]

Compile and execute step_cls against the seeded tables.

Parameters:
  • step_cls (type[Any]) – ETLStep subclass to execute.

  • params (Any) – Concrete params instance for this run.

Returns:

StepResult for assertions.

Raises:
  • KeyError – When a source table was not seeded.

  • RuntimeError – When the step produced no output.

Return type:

StepResult

property target_spec: AppendSpec | ReplaceSpec | ReplacePartitionsSpec | ReplaceWhereSpec | UpsertSpec | HistorifySpec | FileSpec | TempSpec | TempFanInSpec

Target spec from the last run() call.

Raises:

RuntimeError – When run() has not been called yet.

loom.etl.testing.spark.loom_spark_session()[source]

Local SparkSession with Delta Lake extensions.

Scoped to the test session to amortise the ~5 s JVM startup cost.

Return type:

Generator[pyspark.sql.SparkSession, None, None]

loom.etl.testing.spark.loom_spark_runner(loom_spark_session)[source]

Fresh SparkStepRunner per test — no Delta I/O, in-memory only.

Depends on loom_spark_session — the SparkSession is scoped to the test session to amortise the ~5 s JVM startup cost.

Parameters:

loom_spark_session (pyspark.sql.SparkSession)

Return type:

SparkStepRunner