Source code for loom.etl.pipeline._process
"""ETL process base type.
An :class:`ETLProcess` is an ordered set of :class:`~loom.etl.ETLStep`
types. Steps are declared as a list; a nested list marks a parallel group
— all steps in the group run concurrently, and the next item in the outer
list waits for all of them to finish.
Example::
class BuildStagingProcess(ETLProcess[DailyOrdersParams]):
steps = [
ExtractRawOrders,
[ValidateOrders, ValidateCustomers], # parallel
JoinAndStage,
]
"""
from __future__ import annotations
from typing import Any, ClassVar, Generic, TypeVar
from loom.etl.pipeline._generics import _extract_generic_arg
ParamsT = TypeVar("ParamsT")
_StepItem = Any # type[ETLStep[ParamsT]] | list[type[ETLStep[ParamsT]]]
[docs]
class ETLProcess(Generic[ParamsT]):
"""Base class for ordered sets of ETL steps.
Declare :attr:`steps` as a list of step types. A nested list within
:attr:`steps` declares a parallel group — the executor dispatches all
steps in the group concurrently and waits for all to complete before
advancing.
All steps must share the same ``ParamsT`` as the process — enforced by
the :class:`~loom.etl.compiler.ETLCompiler`.
Attributes:
steps: Ordered list of step types. Nested lists are parallel groups.
Example::
class DailyPipeline(ETLProcess[DailyOrdersParams]):
steps = [
IngestStep,
[EnrichA, EnrichB], # parallel
AggregateStep,
]
"""
steps: ClassVar[list[_StepItem]] = []
_params_type: ClassVar[type[Any] | None] = None
def __init_subclass__(cls, **kwargs: Any) -> None:
super().__init_subclass__(**kwargs)
cls._params_type = _extract_generic_arg(cls, ETLProcess)
if not isinstance(cls.__dict__.get("steps", []), list):
raise TypeError(
f"{cls.__qualname__}: 'steps' must be a list, "
f"got {type(cls.__dict__['steps']).__name__}"
)