Source code for loom.etl.pipeline._pipeline

"""ETL pipeline base type.

An :class:`ETLPipeline` is the top-level entry point for compilation and
execution.  It declares an ordered set of :class:`~loom.etl.ETLProcess`
types with the same nested-list convention for parallelism.

Example::

    class DailyOrdersPipeline(ETLPipeline[DailyOrdersParams]):
        processes = [
            BuildStagingProcess,
            [EnrichProductsProcess, EnrichCustomersProcess],  # parallel
            AggregateProcess,
        ]
"""

from __future__ import annotations

from typing import Any, ClassVar, Generic, TypeVar

from loom.etl.pipeline._generics import _extract_generic_arg

ParamsT = TypeVar("ParamsT")

_ProcessItem = Any  # type[ETLProcess[ParamsT]] | list[type[ETLProcess[ParamsT]]]


[docs] class ETLPipeline(Generic[ParamsT]): """Top-level ETL orchestration unit. Declares an ordered list of :class:`~loom.etl.ETLProcess` types. A nested list within :attr:`processes` declares a parallel group. The pipeline is the entry point for :class:`~loom.etl.compiler.ETLCompiler` and :class:`~loom.etl.compiler.ETLExecutor`. All processes must share the same ``ParamsT``. Attributes: processes: Ordered list of process types. Nested lists are parallel groups. Example:: class DailyOrdersPipeline(ETLPipeline[DailyOrdersParams]): processes = [ BuildStagingProcess, [EnrichProductsProcess, EnrichCustomersProcess], AggregateProcess, ] """ processes: ClassVar[list[_ProcessItem]] = [] _params_type: ClassVar[type[Any] | None] = None def __init_subclass__(cls, **kwargs: Any) -> None: super().__init_subclass__(**kwargs) cls._params_type = _extract_generic_arg(cls, ETLPipeline) if not isinstance(cls.__dict__.get("processes", []), list): raise TypeError( f"{cls.__qualname__}: 'processes' must be a list, " f"got {type(cls.__dict__['processes']).__name__}" )