Source code for loom.etl.testing._runners

"""PolarsStepRunner — in-memory Polars step test harness."""

from __future__ import annotations

from typing import Any

import polars as pl

from loom.etl.backends._predicate import predicate_to_sql
from loom.etl.compiler import ETLCompiler
from loom.etl.declarative.source import SourceSpec
from loom.etl.declarative.target import TargetSpec
from loom.etl.declarative.target._table import ReplaceWhereSpec
from loom.etl.executor import ETLExecutor
from loom.etl.testing._result import StepResult


class _PolarsCapturingWriter:
    def __init__(self) -> None:
        self.frame: pl.LazyFrame | pl.DataFrame | None = None
        self.spec: TargetSpec | None = None
        self._last_params: Any = None

    def write(
        self, frame: Any, spec: TargetSpec, params_instance: Any, *, streaming: bool = False
    ) -> None:
        _ = streaming
        self.frame = frame
        self.spec = spec
        self._last_params = params_instance


class _PolarsStubReader:
    def __init__(self, frames: dict[str, pl.LazyFrame]) -> None:
        self._frames = frames

    def read(self, spec: SourceSpec, _params_instance: Any) -> pl.LazyFrame:
        table_ref = getattr(spec, "table_ref", None)
        key = table_ref.ref if table_ref is not None else spec.alias
        return self._frames[key]

    def execute_sql(self, frames: dict[str, Any], query: str, /) -> pl.LazyFrame:
        ctx = pl.SQLContext()
        for name, frame in frames.items():
            ctx.register(name, frame)
        return ctx.execute(query, eager=False)


def _build_lazy_frame(data: list[tuple[Any, ...]], columns: list[str]) -> pl.LazyFrame:
    if not data:
        return pl.DataFrame({col: [] for col in columns}).lazy()
    rows = {col: [row[i] for row in data] for i, col in enumerate(columns)}
    return pl.DataFrame(rows).lazy()


[docs] class PolarsStepRunner: """In-memory test harness for Polars :class:`~loom.etl.ETLStep` subclasses. Seeds are plain Python tuples — no Polars dependency at definition time. Internally, each seed is converted to a ``polars.LazyFrame`` so ``execute()`` receives the same type as in production. No Delta I/O — reads and writes are captured in memory. Example:: def test_double_amount(loom_polars_runner): loom_polars_runner.seed("raw.orders", [(1, 10.0), (2, 20.0)], ["id", "amount"]) result = loom_polars_runner.run(DoubleAmountStep, NoParams()) result.assert_count(2) result.assert_schema({"amount": LoomDtype.FLOAT64}) result.show() """ def __init__(self) -> None: self._seeds: dict[str, tuple[list[tuple[Any, ...]], list[str]]] = {} self._writer = _PolarsCapturingWriter()
[docs] def seed( self, ref: str, data: list[tuple[Any, ...]], columns: list[str], ) -> PolarsStepRunner: """Register raw data under the logical table reference *ref*. Args: ref: Logical table reference, e.g. ``"raw.orders"``. data: Row data as a list of tuples. columns: Column names aligned with the tuple positions. Returns: ``self`` for fluent chaining. """ self._seeds[ref] = (list(data), list(columns)) return self
[docs] def run(self, step_cls: type[Any], params: Any) -> StepResult: """Compile and execute *step_cls* against the seeded tables. Args: step_cls: :class:`~loom.etl.ETLStep` subclass to execute. params: Concrete params instance for this run. Returns: :class:`~loom.etl.testing._result.StepResult` for assertions. Raises: KeyError: When a source table was not seeded. RuntimeError: When the step produced no output. """ frames = {ref: _build_lazy_frame(data, cols) for ref, (data, cols) in self._seeds.items()} plan = ETLCompiler().compile_step(step_cls) self._writer = _PolarsCapturingWriter() ETLExecutor(_PolarsStubReader(frames), self._writer).run_step(plan, params) raw = self._writer.frame if raw is None: raise RuntimeError("Step produced no output — check that target is declared.") collected = raw.collect() if isinstance(raw, pl.LazyFrame) else raw return StepResult(collected)
@property def target_spec(self) -> TargetSpec: """Target spec from the last :meth:`run` call. Raises: RuntimeError: When :meth:`run` has not been called yet. """ if self._writer.spec is None: raise RuntimeError("No spec — call run() first.") return self._writer.spec @property def resolved_predicate(self) -> str | None: """SQL predicate resolved from the last run's target spec and params. Returns ``None`` when the write mode has no predicate. """ spec = self._writer.spec if not isinstance(spec, ReplaceWhereSpec): return None return predicate_to_sql(spec.replace_predicate, self._writer._last_params)