Source code for loom.etl.runner.core

"""ETLRunner — single entry point that wires config, compiler, and executor."""

from __future__ import annotations

import logging
import uuid
from collections.abc import Sequence
from typing import TYPE_CHECKING, Any

import msgspec

if TYPE_CHECKING:
    from pyspark.sql import SparkSession

from loom.etl.checkpoint import CheckpointStore, TempCleaner
from loom.etl.compiler import ETLCompiler
from loom.etl.executor import ETLExecutor, ParallelDispatcher
from loom.etl.observability import (
    CompositeObserver,
    ETLRunObserver,
    ObservabilityConfig,
    RunContext,
    make_observers,
)
from loom.etl.pipeline._pipeline import ETLPipeline
from loom.etl.runner._wiring import (
    make_backends,
    make_checkpoint_store,
    make_execution_record_writer,
)
from loom.etl.runner.config_loader import _load_yaml
from loom.etl.runner.errors import InvalidStageError
from loom.etl.runner.filtering import _filter_plan
from loom.etl.runtime.contracts import SourceReader, TargetWriter
from loom.etl.storage._config import StorageConfig, convert_storage_config

_log = logging.getLogger(__name__)


[docs] class ETLRunner: """Wire storage config, backend I/O, and executor into one callable. Args: reader: Source reader implementation. writer: Target writer implementation. observers: Lifecycle observers wrapped in a composite observer. dispatcher: Parallel task dispatcher. """ def __init__( self, reader: SourceReader, writer: TargetWriter, observers: Sequence[ETLRunObserver] = (), dispatcher: ParallelDispatcher | None = None, checkpoint_store: CheckpointStore | None = None, ) -> None: composite = CompositeObserver(observers) self._executor = ETLExecutor(reader, writer, (composite,), dispatcher, checkpoint_store) self._compiler = ETLCompiler() self._checkpoint_store = checkpoint_store
[docs] @classmethod def from_config( cls, config: StorageConfig, obs_config: ObservabilityConfig | None = None, *, spark: SparkSession | None = None, dispatcher: ParallelDispatcher | None = None, cleaner: TempCleaner | None = None, ) -> ETLRunner: """Build an :class:`ETLRunner` from resolved config objects.""" resolved_obs_config = obs_config or ObservabilityConfig() reader, writer = make_backends(config, spark) record_writer = make_execution_record_writer(config, resolved_obs_config, spark) observers = make_observers(resolved_obs_config, record_writer=record_writer) checkpoint_store = make_checkpoint_store(config, spark, cleaner) return cls(reader, writer, observers, dispatcher, checkpoint_store)
[docs] @classmethod def from_yaml( cls, path: str, *, spark: SparkSession | None = None, dispatcher: ParallelDispatcher | None = None, ) -> ETLRunner: """Load config from a YAML file and build an :class:`ETLRunner`.""" _log.debug("load yaml path=%s", path) storage_config, obs_config = _load_yaml(path) storage_config.validate() return cls.from_config(storage_config, obs_config, spark=spark, dispatcher=dispatcher)
[docs] @classmethod def from_spark( cls, spark: SparkSession, obs_config: ObservabilityConfig | None = None, *, dispatcher: ParallelDispatcher | None = None, cleaner: TempCleaner | None = None, ) -> ETLRunner: """Build an :class:`ETLRunner` using Spark as the execution engine.""" config = StorageConfig(engine="spark") return cls.from_config( config, obs_config, spark=spark, dispatcher=dispatcher, cleaner=cleaner )
[docs] @classmethod def from_dict( cls, storage: dict[str, Any], observability: dict[str, Any] | None = None, *, spark: SparkSession | None = None, dispatcher: ParallelDispatcher | None = None, cleaner: TempCleaner | None = None, ) -> ETLRunner: """Build an :class:`ETLRunner` from pre-resolved plain Python dicts.""" storage_config = convert_storage_config(storage) storage_config.validate() obs_config = ( msgspec.convert(observability, ObservabilityConfig) if observability is not None else ObservabilityConfig() ) return cls.from_config( storage_config, obs_config, spark=spark, dispatcher=dispatcher, cleaner=cleaner )
[docs] def run( self, pipeline: type[ETLPipeline[Any]], params: Any, *, include: Sequence[str] | None = None, correlation_id: str | None = None, attempt: int = 1, last_attempt: bool = True, ) -> None: """Compile, optionally filter, and execute *pipeline*.""" _log.info("compile pipeline=%s", pipeline.__name__) plan = self._compiler.compile(pipeline) if include is not None: _log.debug("filter plan include=%s", sorted(include)) plan = _filter_plan(plan, frozenset(include)) ctx = RunContext( run_id=str(uuid.uuid4()), correlation_id=correlation_id, attempt=attempt, last_attempt=last_attempt, ) self._executor.run_pipeline(plan, params, ctx)
[docs] def cleanup_correlation(self, correlation_id: str) -> None: """Remove all CORRELATION-scope intermediates for *correlation_id*.""" if self._checkpoint_store is None: raise RuntimeError( "cleanup_correlation() requires checkpoint_root (storage.tmp_root) " "to be configured in storage YAML." ) self._checkpoint_store.cleanup_correlation(correlation_id)
__all__ = ["ETLRunner", "InvalidStageError"]