loom.etl.runner¶
Runner API for compiling and executing ETL pipelines.
- class loom.etl.runner.ETLRunner(reader, writer, observability=None, dispatcher=None, checkpoint_store=None)[source]¶
Bases:
objectWire storage config, backend I/O, and executor into one callable.
- Parameters:
reader (SourceReader) – Source reader implementation.
writer (TargetWriter) – Target writer implementation.
observers – Lifecycle observers wrapped in a composite observer.
dispatcher (ParallelDispatcher | None) – Parallel task dispatcher.
observability (ObservabilityRuntime | None)
checkpoint_store (CheckpointStore | None)
- classmethod from_config(config, obs_config=None, *, spark=None, dispatcher=None, cleaner=None, extra_observers=None)[source]¶
Build an
ETLRunnerfrom resolved config objects.- Parameters:
extra_observers (Sequence[LifecycleObserver] | None) – Optional additional
LifecycleObserverinstances appended after the config-derived ones. Use this to inject orchestrator-specific observers (e.g. the Prefect TaskRun observer) without subclassing the runner.config (StorageConfig)
obs_config (ETLObservabilityConfig | None)
spark (SparkSession | None)
dispatcher (ParallelDispatcher | None)
cleaner (TempCleaner | None)
- Return type:
- classmethod from_yaml(path, *, spark=None, dispatcher=None, extra_observers=None)[source]¶
Load config from a YAML file and build an
ETLRunner.- Parameters:
extra_observers (Sequence[LifecycleObserver] | None) – Optional additional
LifecycleObserverinstances forwarded tofrom_config().path (str)
spark (SparkSession | None)
dispatcher (ParallelDispatcher | None)
- Return type:
- classmethod from_spark(spark, obs_config=None, *, dispatcher=None, cleaner=None)[source]¶
Build an
ETLRunnerusing Spark as the execution engine.- Parameters:
spark (SparkSession)
obs_config (ETLObservabilityConfig | None)
dispatcher (ParallelDispatcher | None)
cleaner (TempCleaner | None)
- Return type:
- classmethod from_dict(storage, observability=None, *, spark=None, dispatcher=None, cleaner=None)[source]¶
Build an
ETLRunnerfrom pre-resolved plain Python dicts.
- run(pipeline, params, *, include=None, correlation_id=None, run_id=None, attempt=1, last_attempt=True)[source]¶
Compile, optionally filter, and execute pipeline.
- Parameters:
pipeline (type[ETLPipeline[Any]]) – ETLPipeline class to compile and execute.
params (Any) – ETLParams instance for this run.
include (Sequence[str] | None) – If set, only steps whose names are in this sequence run.
correlation_id (str | None) – Logical business unit identifier, stable across retries.
run_id (str | None) – Execution-specific identifier for lineage records. If None, a random UUID is generated. Callers that share a traceability identifier with an orchestrator (e.g. Prefect) should pass it explicitly so lineage and orchestrator records align.
attempt (int) – Current attempt number (1-based).
last_attempt (bool) – Whether this is the final allowed attempt.
- Return type:
None
- exception loom.etl.runner.InvalidStageError(include)[source]¶
Bases:
ValueErrorRaised when include matches no step or process in the compiled plan.