Source code for loom.etl.runtime.contracts
"""ETL runtime contracts for catalog discovery and I/O operations.
These protocols define the integration surface between the ETL framework and
backend implementations (Spark, Polars, stubs).
Dependency direction
--------------------
``contracts.py`` depends only on ETL domain types (``TableRef``, ``SourceSpec``,
``TargetSpec``). It does not import compiler or runner modules.
"""
from __future__ import annotations
from typing import Any, Protocol, runtime_checkable
from loom.etl.declarative.expr._refs import TableRef
from loom.etl.declarative.source import SourceSpec
from loom.etl.declarative.target import TargetSpec
from loom.etl.schema._schema import ColumnSchema
[docs]
@runtime_checkable
class TableDiscovery(Protocol):
"""Protocol for catalog-backed table existence and schema discovery.
Used by :class:`~loom.etl.compiler.ETLCompiler` when a catalog is injected
at construction time to validate that source and target tables are valid
before execution begins.
"""
[docs]
def exists(self, ref: TableRef) -> bool:
"""Return ``True`` when the table exists in the catalog.
Args:
ref: Logical table reference to check.
Returns:
``True`` when the table is registered.
"""
...
[docs]
def columns(self, ref: TableRef) -> tuple[str, ...]:
"""Return table column names in schema order.
Args:
ref: Logical table reference.
Returns:
Tuple of column names, or ``()`` when unknown.
"""
...
[docs]
def schema(self, ref: TableRef) -> tuple[ColumnSchema, ...] | None:
"""Return full table schema.
Args:
ref: Logical table reference.
Returns:
Ordered column schema tuple, or ``None`` when table is missing.
"""
...
[docs]
def update_schema(self, ref: TableRef, schema: tuple[ColumnSchema, ...]) -> None:
"""Persist table schema after a successful write.
Args:
ref: Logical table reference.
schema: Authoritative table schema.
"""
...
[docs]
@runtime_checkable
class SourceReader(Protocol):
"""Protocol for reading one ETL source specification into a frame."""
[docs]
def read(self, spec: SourceSpec, params_instance: Any, /) -> Any:
"""Read the source and return a frame.
Args:
spec: Compiled source specification.
params_instance: Concrete params for current run.
Returns:
Backend frame type accepted by the step ``execute()`` method.
"""
...
[docs]
@runtime_checkable
class SQLExecutor(Protocol):
"""Optional capability protocol for SQL execution over source frames."""
[docs]
def execute_sql(self, frames: dict[str, Any], query: str, /) -> Any:
"""Execute a SQL query against backend frames.
Used by ``StepSQL`` execution path in :class:`~loom.etl.executor.ETLExecutor`.
Args:
frames: Alias -> frame mapping from resolved step sources.
query: SQL query text.
Returns:
Backend frame type with query results.
"""
...
[docs]
@runtime_checkable
class TargetWriter(Protocol):
"""Protocol for writing one frame into one ETL target specification."""
[docs]
def write(
self, frame: Any, spec: TargetSpec, params_instance: Any, /, *, streaming: bool = False
) -> None:
"""Write frame to target.
Args:
frame: Frame returned by step ``execute()``.
spec: Compiled target specification.
params_instance: Concrete params for current run.
streaming: Hint for lazy backends to use streaming materialization.
"""
...
__all__ = ["TableDiscovery", "SourceReader", "SQLExecutor", "TargetWriter"]