Source code for loom.etl.pipeline._step

"""ETL step base type.

:class:`ETLStep` is the single unit of ETL work: it declares its sources,
one target, and a pure ``execute()`` transformation method.

The backend (Polars, Spark) is determined by the configured
:class:`~loom.etl.runtime.contracts.SourceReader` and
:class:`~loom.etl.runtime.contracts.TargetWriter` injected at runner build time —
no declaration is needed on the step itself.

Definition-time validation (via ``__init_subclass__``) catches the most
common structural errors early, before the full :class:`~loom.etl.compiler.ETLCompiler`
runs.
"""

from __future__ import annotations

from enum import Enum
from typing import Any, ClassVar, Generic, TypeVar

from loom.etl.declarative.source import FromFile, FromTable, FromTemp, Sources, SourceSet
from loom.etl.declarative.target import IntoFile, IntoTable, IntoTemp
from loom.etl.pipeline._generics import _extract_generic_arg

ParamsT = TypeVar("ParamsT")

_RESERVED_NAMES: frozenset[str] = frozenset(
    {
        "sources",
        "target",
        "streaming",
        "execute",
        "_params_type",
        "_source_form",
        "_inline_sources",
    }
)

_SOURCE_TYPES = (FromTable, FromFile, FromTemp)
_TARGET_TYPES = (IntoTable, IntoFile, IntoTemp)


class _SourceForm(Enum):
    INLINE = "inline"  # Form 1: class-level FromTable / FromFile attributes
    GROUPED = "grouped"  # Form 2: sources = Sources(...) or SourceSet instance
    NONE = "none"  # step declares no sources (generator steps)


[docs] class ETLStep(Generic[ParamsT]): """Base class for all ETL transformation steps. Subclass, declare :attr:`sources` and :attr:`target`, then implement :meth:`execute`. The ``*`` separator in ``execute`` makes all injected DataFrame parameters keyword-only — the executor always injects by name. Source forms ------------ Three authoring forms are supported (see :mod:`loom.etl._source`). Mixing Form 1 and Form 2 in the same class is a definition-time error. Example:: class BuildOrdersStaging(ETLStep[DailyOrdersParams]): sources = Sources( orders=FromTable("raw.orders").where( (col("year") == params.run_date.year) & (col("month") == params.run_date.month), ), customers=FromTable("raw.customers"), ) target = IntoTable("staging.orders").replace_partitions( values={"year": params.run_date.year, "month": params.run_date.month} ) def execute( self, params: DailyOrdersParams, *, orders: pl.DataFrame, customers: pl.DataFrame, ) -> pl.DataFrame: return orders.join(customers, on="customer_id", how="left") """ sources: ClassVar[Sources | SourceSet[Any] | None] = None target: ClassVar[IntoTable | IntoFile | IntoTemp | None] = None streaming: ClassVar[bool] = False # Set by __init_subclass__ _params_type: ClassVar[type[Any] | None] = None _source_form: ClassVar[_SourceForm] = _SourceForm.NONE _inline_sources: ClassVar[dict[str, FromTable | FromFile | FromTemp]] def __init_subclass__(cls, **kwargs: Any) -> None: super().__init_subclass__(**kwargs) cls._params_type = _extract_generic_arg(cls, ETLStep) cls._inline_sources = {} _validate_streaming_flag(cls) _validate_and_classify_sources(cls)
[docs] def execute(self, params: ParamsT, **frames: Any) -> Any: """Transform source frames into the output frame. Must be overridden by every concrete step. Declare the ``params`` and frame parameters explicitly — the compiler validates the signature against :attr:`sources` at compile time. Args: params: Typed params instance for this run. **frames: Source DataFrames injected by the executor, keyed by the source alias declared in :attr:`sources`. Returns: Transformed DataFrame (type must match the active backend). """ raise NotImplementedError(f"{type(self).__qualname__} must implement execute()")
# --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _validate_and_classify_sources(cls: type[Any]) -> None: """Detect source form and enforce mutual exclusion rules.""" inline = { name: val for name, val in cls.__dict__.items() if isinstance(val, _SOURCE_TYPES) and name not in _RESERVED_NAMES } has_grouped = isinstance(cls.__dict__.get("sources"), (Sources, SourceSet)) if inline and has_grouped: raise TypeError( f"{cls.__qualname__}: cannot mix inline source attributes with " f"sources=Sources(...) or sources=SourceSet — use one form only." ) if inline: cls._source_form = _SourceForm.INLINE cls._inline_sources = inline elif has_grouped: cls._source_form = _SourceForm.GROUPED else: cls._source_form = _SourceForm.NONE def _validate_streaming_flag(cls: type[Any]) -> None: """Enforce that ``streaming`` is explicitly boolean at class definition time.""" streaming = getattr(cls, "streaming", False) if not isinstance(streaming, bool): raise TypeError( f"{cls.__qualname__}: 'streaming' must be bool, got {type(streaming).__name__}" )