loom.etl.testing.spark¶
Spark testing utilities and pytest fixtures for the Loom ETL framework.
Provides SparkTestSession — a context manager that handles Spark
configuration, Delta extension setup, and teardown.
The loom_spark_session pytest fixture is registered automatically via the
pytest11 entry point when loom-kernel[etl-spark] is installed — no
explicit import or pytest_plugins declaration is needed.
Usage in conftest.py:
import pytest
from loom.etl.testing.spark import SparkTestSession, SparkStepRunner
@pytest.fixture(scope="session")
def spark():
with SparkTestSession.start(app="my-tests", parallelism=1) as session:
yield session
@pytest.fixture
def step_runner(spark):
return SparkStepRunner(spark)
Functions
|
|
|
|
|
|
|
|
|
|
|
|
|
Fresh |
Local SparkSession with Delta Lake extensions. |
Classes
|
In-memory test harness for Spark |
|
Context manager for a local PySpark + Delta test session. |
|
|
|
- class loom.etl.testing.spark.SparkTestSession(session)[source]¶
Bases:
objectContext manager for a local PySpark + Delta test session.
Handles Delta extension configuration and clean teardown.
Use as a context manager to guarantee
spark.stop()is always called:with SparkTestSession.start(app="etl-tests") as spark: df = spark.createDataFrame([(1, "a")], ["id", "label"])
- Parameters:
session (SparkSession) – Active
pyspark.sql.SparkSession.
- classmethod start(*, app='loom-etl-test', parallelism=1, memory='1g', ivy_dir=None)[source]¶
Create and return a configured local SparkSession.
- Parameters:
app (str) – Spark application name shown in the UI.
parallelism (int) –
spark.sql.shuffle.partitions— keep low (1–2) for unit tests to reduce overhead.memory (str) – Driver heap size string (e.g.
"1g").ivy_dir (str | Path | None) – Optional Ivy cache directory for Maven package resolution. When omitted in a constrained sandbox, a writable temp directory is selected automatically.
- Returns:
A
SparkTestSessionwrapping the active session.- Return type:
- property session: pyspark.sql.SparkSession¶
The underlying
pyspark.sql.SparkSession.
- class loom.etl.testing.spark.SparkStepRunner(spark)[source]¶
Bases:
objectIn-memory test harness for Spark
ETLStepsubclasses.Seeds are plain Python tuples — no Spark dependency at definition time. Internally, each seed is converted to a
pyspark.sql.DataFramesoexecute()receives the same type as in production.No Delta I/O — reads and writes are captured in memory.
Example:
def test_aggregate(loom_spark_runner): loom_spark_runner.seed("raw.orders", [(1, 10.0)], ["id", "amount"]) result = loom_spark_runner.run(AggregateStep, NoParams()) result.assert_count(1) result.show()
- Parameters:
spark (SparkSession)
- run(step_cls, params)[source]¶
Compile and execute step_cls against the seeded tables.
- Parameters:
- Returns:
StepResultfor assertions.- Raises:
KeyError – When a source table was not seeded.
RuntimeError – When the step produced no output.
- Return type:
- property target_spec: AppendSpec | ReplaceSpec | ReplacePartitionsSpec | ReplaceWhereSpec | UpsertSpec | HistorifySpec | FileSpec | TempSpec | TempFanInSpec¶
Target spec from the last
run()call.- Raises:
RuntimeError – When
run()has not been called yet.
- loom.etl.testing.spark.loom_spark_session()[source]¶
Local SparkSession with Delta Lake extensions.
Scoped to the test session to amortise the ~5 s JVM startup cost.
- Return type:
Generator[pyspark.sql.SparkSession, None, None]
- loom.etl.testing.spark.loom_spark_runner(loom_spark_session)[source]¶
Fresh
SparkStepRunnerper test — no Delta I/O, in-memory only.Depends on
loom_spark_session— the SparkSession is scoped to the test session to amortise the ~5 s JVM startup cost.- Parameters:
loom_spark_session (pyspark.sql.SparkSession)
- Return type: