Source code for loom.etl.pipeline._step_sql

"""StepSQL — SQL-first ETL step.

Declares the transformation as SQL text instead of implementing a Python
``execute()`` body. SQL is rendered by the step, then executed by the runtime
reader backend (Spark/Polars) via ``SQLExecutor.execute_sql``.

Internal — not part of the public API surface, exposed via ``loom.etl``.
"""

from __future__ import annotations

import typing
from typing import Any, ClassVar, Generic, TypeVar, cast

from loom.etl.pipeline._sql import resolve_sql
from loom.etl.pipeline._step import ETLStep

ParamsT = TypeVar("ParamsT")
FrameT = TypeVar("FrameT")


[docs] class StepSQL(ETLStep[ParamsT], Generic[ParamsT, FrameT]): """SQL-first ETL step — transformation declared as SQL, not Python. Declare ``sql`` as a class variable (static SQL) or a ``@staticmethod`` that receives params and returns a SQL string (dynamic SQL). Sources still support ``.where()`` for pre-filtering before the frame is registered as a view. Target write modes work identically to :class:`~loom.etl.ETLStep`. .. warning:: Avoid interpolating raw string params directly. Use ``FromTable.where()`` for source-level filtering instead. """ sql: ClassVar[str] def __init_subclass__(cls, **kwargs: Any) -> None: super().__init_subclass__(**kwargs) params_type, frame_type = _extract_stepsql_types(cls) # Override _params_type from ETLStep (reads ETLStep[T], not StepSQL[T, F]) if params_type is not None: cls._params_type = params_type if frame_type is not None: _generate_execute(cls, frame_type)
[docs] def execute(self, params: Any, **frames: Any) -> Any: _ = params _ = frames raise NotImplementedError( f"{type(self).__qualname__} is a SQL step. " "It is executed by ETLExecutor via SQLExecutor.execute_sql()." )
[docs] def render_sql(self, params: Any) -> str: """Render SQL for current params instance.""" return _resolve_query(type(self), params)
def _extract_stepsql_types(cls: type) -> tuple[type | None, type | None]: for base in getattr(cls, "__orig_bases__", ()): origin = getattr(base, "__origin__", None) if origin is StepSQL: args = typing.get_args(base) params_t = cast(type, args[0]) if len(args) > 0 else None frame_t = cast(type, args[1]) if len(args) > 1 else None return params_t, frame_t return None, None def _generate_execute(cls: type, return_type: type) -> None: """Inject execute() return annotation inferred from StepSQL generic.""" def execute(self: Any, params: Any, **frames: Any) -> Any: _ = params _ = frames raise NotImplementedError( f"{type(self).__qualname__} is a SQL step. " "It is executed by ETLExecutor via SQLExecutor.execute_sql()." ) execute.__annotations__["return"] = return_type cls.execute = execute # type: ignore[attr-defined] def _resolve_query(cls: type, params: Any) -> str: raw = cls.sql # type: ignore[attr-defined] if callable(raw): return raw(params) # type: ignore[no-any-return] return resolve_sql(raw, params)