"""ETLRunner — single entry point that wires config, compiler, and executor."""
from __future__ import annotations
import logging
import uuid
from collections.abc import Sequence
from typing import TYPE_CHECKING, Any
import msgspec
if TYPE_CHECKING:
from pyspark.sql import SparkSession
from loom.core.observability.protocol import LifecycleObserver
from loom.core.observability.runtime import ObservabilityRuntime
from loom.core.runner import flush_runner
from loom.etl.checkpoint import CheckpointStore, TempCleaner
from loom.etl.compiler import ETLCompiler
from loom.etl.executor import ETLExecutor, ParallelDispatcher
from loom.etl.lineage._config import ETLObservabilityConfig
from loom.etl.lineage._observer import LineageObserver
from loom.etl.lineage._records import RunContext
from loom.etl.pipeline._pipeline import ETLPipeline
from loom.etl.runner._wiring import (
make_backends,
make_checkpoint_store,
make_lineage_store,
)
from loom.etl.runner.config_loader import _load_yaml
from loom.etl.runner.errors import InvalidStageError
from loom.etl.runner.filtering import _filter_plan
from loom.etl.runtime.contracts import SourceReader, TargetWriter
from loom.etl.storage._config import StorageConfig, convert_storage_config
_log = logging.getLogger(__name__)
[docs]
class ETLRunner:
"""Wire storage config, backend I/O, and executor into one callable.
Args:
reader: Source reader implementation.
writer: Target writer implementation.
observers: Lifecycle observers wrapped in a composite observer.
dispatcher: Parallel task dispatcher.
"""
def __init__(
self,
reader: SourceReader,
writer: TargetWriter,
observability: ObservabilityRuntime | None = None,
dispatcher: ParallelDispatcher | None = None,
checkpoint_store: CheckpointStore | None = None,
) -> None:
self._executor = ETLExecutor(
reader,
writer,
observability or ObservabilityRuntime.noop(),
dispatcher,
checkpoint_store,
)
self._compiler = ETLCompiler()
self._checkpoint_store = checkpoint_store
[docs]
@classmethod
def from_config(
cls,
config: StorageConfig,
obs_config: ETLObservabilityConfig | None = None,
*,
spark: SparkSession | None = None,
dispatcher: ParallelDispatcher | None = None,
cleaner: TempCleaner | None = None,
extra_observers: Sequence[LifecycleObserver] | None = None,
) -> ETLRunner:
"""Build an :class:`ETLRunner` from resolved config objects.
Args:
extra_observers: Optional additional :class:`LifecycleObserver`
instances appended after the config-derived ones. Use this
to inject orchestrator-specific observers (e.g. the Prefect
TaskRun observer) without subclassing the runner.
"""
resolved_obs_config = obs_config or ETLObservabilityConfig()
reader, writer = make_backends(config, spark)
observability = ObservabilityRuntime.from_config(resolved_obs_config)
if resolved_obs_config.lineage.enabled:
lineage_store = make_lineage_store(config, resolved_obs_config.lineage, spark)
if lineage_store is not None:
observability = ObservabilityRuntime(
[*observability.observers, LineageObserver(lineage_store)]
)
if extra_observers:
observability = ObservabilityRuntime([*observability.observers, *extra_observers])
checkpoint_store = make_checkpoint_store(config, spark, cleaner)
return cls(reader, writer, observability, dispatcher, checkpoint_store)
[docs]
@classmethod
def from_yaml(
cls,
path: str,
*,
spark: SparkSession | None = None,
dispatcher: ParallelDispatcher | None = None,
extra_observers: Sequence[LifecycleObserver] | None = None,
) -> ETLRunner:
"""Load config from a YAML file and build an :class:`ETLRunner`.
Args:
extra_observers: Optional additional :class:`LifecycleObserver`
instances forwarded to :meth:`from_config`.
"""
_log.debug("load yaml path=%s", path)
storage_config, obs_config = _load_yaml(path)
storage_config.validate()
return cls.from_config(
storage_config,
obs_config,
spark=spark,
dispatcher=dispatcher,
extra_observers=extra_observers,
)
[docs]
@classmethod
def from_spark(
cls,
spark: SparkSession,
obs_config: ETLObservabilityConfig | None = None,
*,
dispatcher: ParallelDispatcher | None = None,
cleaner: TempCleaner | None = None,
) -> ETLRunner:
"""Build an :class:`ETLRunner` using Spark as the execution engine."""
config = StorageConfig(engine="spark")
return cls.from_config(
config, obs_config, spark=spark, dispatcher=dispatcher, cleaner=cleaner
)
[docs]
@classmethod
def from_dict(
cls,
storage: dict[str, Any],
observability: dict[str, Any] | None = None,
*,
spark: SparkSession | None = None,
dispatcher: ParallelDispatcher | None = None,
cleaner: TempCleaner | None = None,
) -> ETLRunner:
"""Build an :class:`ETLRunner` from pre-resolved plain Python dicts."""
storage_config = convert_storage_config(storage)
storage_config.validate()
obs_config = (
msgspec.convert(observability, ETLObservabilityConfig)
if observability is not None
else ETLObservabilityConfig()
)
return cls.from_config(
storage_config, obs_config, spark=spark, dispatcher=dispatcher, cleaner=cleaner
)
[docs]
def run(
self,
pipeline: type[ETLPipeline[Any]],
params: Any,
*,
include: Sequence[str] | None = None,
correlation_id: str | None = None,
run_id: str | None = None,
attempt: int = 1,
last_attempt: bool = True,
) -> None:
"""Compile, optionally filter, and execute *pipeline*.
Args:
pipeline: ETLPipeline class to compile and execute.
params: ETLParams instance for this run.
include: If set, only steps whose names are in this sequence run.
correlation_id: Logical business unit identifier, stable across retries.
run_id: Execution-specific identifier for lineage records. If None,
a random UUID is generated. Callers that share a traceability
identifier with an orchestrator (e.g. Prefect) should pass it
explicitly so lineage and orchestrator records align.
attempt: Current attempt number (1-based).
last_attempt: Whether this is the final allowed attempt.
"""
_log.info("compile pipeline=%s", pipeline.__name__)
plan = self._compiler.compile(pipeline)
if include is not None:
_log.debug("filter plan include=%s", sorted(include))
plan = _filter_plan(plan, frozenset(include))
ctx = RunContext(
run_id=run_id if run_id is not None else str(uuid.uuid4()),
correlation_id=correlation_id,
attempt=attempt,
last_attempt=last_attempt,
)
try:
self._executor.run_pipeline(plan, params, ctx)
finally:
try:
flush_runner(self)
finally:
if self._checkpoint_store is not None:
try:
self._checkpoint_store.cleanup_run(ctx.run_id)
except Exception:
_log.warning(
"checkpoint cleanup failed run_id=%s", ctx.run_id, exc_info=True
)
[docs]
def flush(self) -> None:
"""Flush buffered ETL observability sinks after a run."""
self._executor.flush()
[docs]
def cleanup_correlation(self, correlation_id: str) -> None:
"""Remove all CORRELATION-scope intermediates for *correlation_id*."""
if self._checkpoint_store is None:
raise RuntimeError(
"cleanup_correlation() requires checkpoint_root (storage.temp.root) "
"to be configured in storage YAML."
)
self._checkpoint_store.cleanup_correlation(correlation_id)
__all__ = ["ETLRunner", "InvalidStageError"]