Source code for loom.etl.runtime.contracts
"""ETL runtime contracts for catalog discovery and I/O operations.
These protocols define the integration surface between the ETL framework and
backend implementations (Spark, Polars, stubs).
Dependency direction
--------------------
``contracts.py`` depends only on ETL domain types (``TableRef``, ``SourceSpec``,
``TargetSpec``). It does not import compiler or runner modules.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable
from loom.etl.declarative.expr._refs import TableRef
from loom.etl.declarative.source import SourceSpec
from loom.etl.declarative.target import TargetSpec
from loom.etl.schema._schema import ColumnSchema
if TYPE_CHECKING:
from loom.etl.lineage._records import WriteContext
[docs]
@runtime_checkable
class TableDiscovery(Protocol):
"""Protocol for catalog-backed table existence and schema discovery.
Used by :class:`~loom.etl.compiler.ETLCompiler` when a catalog is injected
at construction time to validate that source and target tables are valid
before execution begins.
"""
[docs]
def exists(self, ref: TableRef) -> bool:
"""Return ``True`` when the table exists in the catalog.
Args:
ref: Logical table reference to check.
Returns:
``True`` when the table is registered.
"""
...
[docs]
def columns(self, ref: TableRef) -> tuple[str, ...]:
"""Return table column names in schema order.
Args:
ref: Logical table reference.
Returns:
Tuple of column names, or ``()`` when unknown.
"""
...
[docs]
def schema(self, ref: TableRef) -> tuple[ColumnSchema, ...] | None:
"""Return full table schema.
Args:
ref: Logical table reference.
Returns:
Ordered column schema tuple, or ``None`` when table is missing.
"""
...
[docs]
def update_schema(self, ref: TableRef, schema: tuple[ColumnSchema, ...]) -> None:
"""Persist table schema after a successful write.
Args:
ref: Logical table reference.
schema: Authoritative table schema.
"""
...
[docs]
@runtime_checkable
class SourceReader(Protocol):
"""Protocol for reading one ETL source specification into a frame."""
[docs]
def read(self, spec: SourceSpec, params_instance: Any, /) -> Any:
"""Read the source and return a frame.
Args:
spec: Compiled source specification.
params_instance: Concrete params for current run.
Returns:
Backend frame type accepted by the step ``execute()`` method.
"""
...
@runtime_checkable
class StreamingSourceReader(Protocol):
"""Optional capability protocol for memory-bounded streaming reads.
Implementations must return a backend frame (typically a
:class:`polars.LazyFrame`) whose terminal ``collect(engine="streaming")``
consumes the source incrementally — without materializing the full result
set in memory. Typical implementations spool a server-side cursor (e.g.
ClickHouse ``query_arrow_stream``) into a temporary IPC file and return a
``pl.scan_ipc(...)`` over it, capping peak RAM at one batch.
This capability is opt-in: readers expose it only when they can guarantee
bounded memory. ``ETLExecutor`` checks for this protocol via
``isinstance(reader, StreamingSourceReader)`` and raises a clear
``TypeError`` when a step requests ``streaming=True`` against a reader
that lacks the capability, to prevent silent OOM regressions.
Example:
>>> reader: StreamingSourceReader = ClickHouseSourceReader(url=...)
>>> lazy = reader.read_streaming(spec, params)
>>> df = lazy.collect(engine="streaming")
"""
def read_streaming(self, spec: SourceSpec, params_instance: Any, /) -> Any:
"""Read the source using a memory-bounded streaming strategy.
Args:
spec: Compiled source specification.
params_instance: Concrete params for current run.
Returns:
Backend frame type (typically ``pl.LazyFrame``) whose downstream
``collect(engine="streaming")`` consumes the source incrementally.
Raises:
TypeError: When the underlying client cannot stream this spec.
"""
...
[docs]
@runtime_checkable
class SQLExecutor(Protocol):
"""Optional capability protocol for SQL execution over source frames."""
[docs]
def execute_sql(self, frames: dict[str, Any], query: str, /) -> Any:
"""Execute a SQL query against backend frames.
Used by ``StepSQL`` execution path in :class:`~loom.etl.executor.ETLExecutor`.
Args:
frames: Alias -> frame mapping from resolved step sources.
query: SQL query text.
Returns:
Backend frame type with query results.
"""
...
[docs]
@runtime_checkable
class TargetWriter(Protocol):
"""Protocol for writing one frame into one ETL target specification."""
[docs]
def write(
self,
frame: Any,
spec: TargetSpec,
params_instance: Any,
/,
*,
streaming: bool = False,
write_ctx: WriteContext | None = None,
) -> None:
"""Write frame to target.
Args:
frame: Frame returned by step ``execute()``.
spec: Compiled target specification.
params_instance: Concrete params for current run.
streaming: Hint for lazy backends to use streaming materialization.
write_ctx: Execution context for audit-column injection.
``None`` disables audit columns regardless of config.
"""
...
__all__ = [
"TableDiscovery",
"SourceReader",
"StreamingSourceReader",
"SQLExecutor",
"TargetWriter",
]