loom.etl.runner

Runner API for compiling and executing ETL pipelines.

class loom.etl.runner.ETLRunner(reader, writer, observability=None, dispatcher=None, checkpoint_store=None)[source]

Bases: object

Wire storage config, backend I/O, and executor into one callable.

Parameters:
  • reader (SourceReader) – Source reader implementation.

  • writer (TargetWriter) – Target writer implementation.

  • observers – Lifecycle observers wrapped in a composite observer.

  • dispatcher (ParallelDispatcher | None) – Parallel task dispatcher.

  • observability (ObservabilityRuntime | None)

  • checkpoint_store (CheckpointStore | None)

classmethod from_config(config, obs_config=None, *, spark=None, dispatcher=None, cleaner=None, extra_observers=None)[source]

Build an ETLRunner from resolved config objects.

Parameters:
  • extra_observers (Sequence[LifecycleObserver] | None) – Optional additional LifecycleObserver instances appended after the config-derived ones. Use this to inject orchestrator-specific observers (e.g. the Prefect TaskRun observer) without subclassing the runner.

  • config (StorageConfig)

  • obs_config (ETLObservabilityConfig | None)

  • spark (SparkSession | None)

  • dispatcher (ParallelDispatcher | None)

  • cleaner (TempCleaner | None)

Return type:

ETLRunner

classmethod from_yaml(path, *, spark=None, dispatcher=None, extra_observers=None)[source]

Load config from a YAML file and build an ETLRunner.

Parameters:
  • extra_observers (Sequence[LifecycleObserver] | None) – Optional additional LifecycleObserver instances forwarded to from_config().

  • path (str)

  • spark (SparkSession | None)

  • dispatcher (ParallelDispatcher | None)

Return type:

ETLRunner

classmethod from_spark(spark, obs_config=None, *, dispatcher=None, cleaner=None)[source]

Build an ETLRunner using Spark as the execution engine.

Parameters:
  • spark (SparkSession)

  • obs_config (ETLObservabilityConfig | None)

  • dispatcher (ParallelDispatcher | None)

  • cleaner (TempCleaner | None)

Return type:

ETLRunner

classmethod from_dict(storage, observability=None, *, spark=None, dispatcher=None, cleaner=None)[source]

Build an ETLRunner from pre-resolved plain Python dicts.

Parameters:
  • storage (dict[str, Any])

  • observability (dict[str, Any] | None)

  • spark (SparkSession | None)

  • dispatcher (ParallelDispatcher | None)

  • cleaner (TempCleaner | None)

Return type:

ETLRunner

run(pipeline, params, *, include=None, correlation_id=None, run_id=None, attempt=1, last_attempt=True)[source]

Compile, optionally filter, and execute pipeline.

Parameters:
  • pipeline (type[ETLPipeline[Any]]) – ETLPipeline class to compile and execute.

  • params (Any) – ETLParams instance for this run.

  • include (Sequence[str] | None) – If set, only steps whose names are in this sequence run.

  • correlation_id (str | None) – Logical business unit identifier, stable across retries.

  • run_id (str | None) – Execution-specific identifier for lineage records. If None, a random UUID is generated. Callers that share a traceability identifier with an orchestrator (e.g. Prefect) should pass it explicitly so lineage and orchestrator records align.

  • attempt (int) – Current attempt number (1-based).

  • last_attempt (bool) – Whether this is the final allowed attempt.

Return type:

None

flush()[source]

Flush buffered ETL observability sinks after a run.

Return type:

None

cleanup_correlation(correlation_id)[source]

Remove all CORRELATION-scope intermediates for correlation_id.

Parameters:

correlation_id (str)

Return type:

None

exception loom.etl.runner.InvalidStageError(include)[source]

Bases: ValueError

Raised when include matches no step or process in the compiled plan.

Parameters:

include (frozenset[str]) – The set of names that produced no match.

Return type:

None