"""Target builder types — IntoTable, IntoFile, IntoTemp, SchemaMode.
Public API — import from :mod:`loom.etl.declarative.target` or :mod:`loom.etl.declarative`.
Each ETL step declares exactly one target. Write mode and schema mode are
set by chaining a write-intent method::
IntoTable("staging.orders").append()
IntoTable("staging.orders").replace()
IntoTable("staging.orders").replace(schema=SchemaMode.EVOLVE)
IntoTable("staging.orders").replace(schema=SchemaMode.OVERWRITE)
IntoTable("staging.orders").replace_partitions("year", "month")
IntoTable("staging.orders").replace_where(
(col("year") == params.run_date.year) & (col("month") == params.run_date.month)
)
IntoTable("staging.orders").upsert(keys=("order_id",))
IntoFile("s3://exports/report_{run_date}.csv", format=Format.CSV)
"""
from __future__ import annotations
from typing import Any
from loom.etl.checkpoint import CheckpointScope
from loom.etl.declarative._format import Format
from loom.etl.declarative._utils import _clone_slots
from loom.etl.declarative._write_options import WriteOptions
from loom.etl.declarative.expr._params import ParamExpr
from loom.etl.declarative.expr._predicate import AndPred, EqPred, PredicateNode
from loom.etl.declarative.expr._refs import TableRef, UnboundColumnRef
from loom.etl.declarative.target._file import FileSpec
from loom.etl.declarative.target._schema_mode import SchemaMode
from loom.etl.declarative.target._table import (
AppendSpec,
ReplacePartitionsSpec,
ReplaceSpec,
ReplaceWhereSpec,
UpsertSpec,
)
from loom.etl.declarative.target._temp import TempFanInSpec, TempSpec
def _build_eq_predicate(values: dict[str, ParamExpr]) -> PredicateNode:
items = list(values.items())
head_col, head_expr = items[0]
result: PredicateNode = EqPred(left=UnboundColumnRef(head_col), right=head_expr)
for col_name, expr in items[1:]:
result = AndPred(left=result, right=EqPred(left=UnboundColumnRef(col_name), right=expr))
return result
[docs]
class IntoTable:
"""Declare a Delta table as the ETL step target.
The write mode is set by chaining one of:
:meth:`append`, :meth:`replace`, :meth:`replace_partitions`,
:meth:`replace_where`, :meth:`upsert`.
Args:
ref: Logical table reference — ``str`` or :class:`~loom.etl.TableRef`.
Example::
# Full replace
target = IntoTable("staging.orders").replace()
# Replace only the partitions present in the batch
target = IntoTable("staging.orders").replace_partitions("year", "month")
# Replace a date-range for backfill
target = IntoTable("staging.orders").replace_where(
col("date").between(params.start_date, params.end_date)
)
"""
__slots__ = ("_ref", "_spec")
def __init__(self, ref: str | TableRef) -> None:
table_ref = TableRef(ref) if isinstance(ref, str) else ref
self._ref = table_ref
# Default write mode is REPLACE. Call a write-mode method to change it.
self._spec: Any = ReplaceSpec(table_ref=table_ref)
[docs]
def append(self, *, schema: SchemaMode = SchemaMode.STRICT) -> IntoTable:
"""Write mode: append rows to the target table.
Args:
schema: Schema evolution strategy. Defaults to
:attr:`~SchemaMode.STRICT`.
Returns:
New ``IntoTable`` with APPEND mode.
"""
return self._with(AppendSpec(table_ref=self._ref, schema_mode=schema))
[docs]
def replace(self, *, schema: SchemaMode = SchemaMode.STRICT) -> IntoTable:
"""Write mode: full replace of the target table.
Overwrites all data in the table. Use :meth:`replace_partitions` for
partition-scoped overwrite or :meth:`replace_where` for predicate-scoped.
Args:
schema: Schema evolution strategy. Use
:attr:`~SchemaMode.OVERWRITE` to replace the table schema
entirely alongside the data.
Returns:
New ``IntoTable`` with REPLACE mode.
"""
return self._with(ReplaceSpec(table_ref=self._ref, schema_mode=schema))
[docs]
def replace_partitions(
self,
*cols: str,
schema: SchemaMode = SchemaMode.STRICT,
) -> IntoTable:
"""Write mode: replace the partitions present in the batch frame.
Collects the distinct partition values from the frame at write time and
builds the replace predicate dynamically — no params required.
For replacing a specific partition whose values come from run params,
use :meth:`replace_partition`.
Args:
*cols: Partition column names to collect from the frame.
schema: Schema evolution strategy.
Returns:
New ``IntoTable`` with REPLACE_PARTITIONS mode.
Raises:
ValueError: If no column names are provided.
Example::
target = IntoTable("staging.orders").replace_partitions("year", "month")
"""
if not cols:
raise ValueError("replace_partitions: at least one partition column name is required.")
return self._with(
ReplacePartitionsSpec(
table_ref=self._ref,
partition_cols=cols,
schema_mode=schema,
)
)
[docs]
def replace_partition(
self,
*,
schema: SchemaMode = SchemaMode.STRICT,
**partition: ParamExpr,
) -> IntoTable:
"""Write mode: replace a specific partition identified by exact column values from params.
Use when the partition to replace is known at pipeline design time and
its values come from run params. The writer resolves each value against
the concrete params at runtime and issues a Delta ``replaceWhere`` on the
resulting equality predicate — no collect required.
For dynamic replacement (values inferred from the batch), use
:meth:`replace_partitions`. For arbitrary predicates, use
:meth:`replace_where`.
Args:
schema: Schema evolution strategy.
**partition:
Partition column name →
:class:`~loom.etl.declarative.expr._params.ParamExpr` pairs
(e.g. ``year=params.run_date.year``).
Returns:
New ``IntoTable`` with REPLACE_WHERE mode.
Raises:
ValueError: If no column=value pairs are provided.
Example::
target = IntoTable("staging.orders").replace_partition(
year=params.run_date.year,
month=params.run_date.month,
)
"""
if not partition:
raise ValueError("replace_partition: at least one column=value pair is required.")
predicate = _build_eq_predicate(partition)
return self._with(
ReplaceWhereSpec(
table_ref=self._ref,
replace_predicate=predicate,
schema_mode=schema,
)
)
[docs]
def replace_where(
self,
predicate: PredicateNode,
*,
schema: SchemaMode = SchemaMode.STRICT,
) -> IntoTable:
"""Write mode: replace rows matching an explicit predicate.
The predicate is resolved against the run params and passed to Delta as
``replaceWhere``. Only the matching data is overwritten — Delta uses
partition pruning so only affected files are rewritten.
Typical use case: backfill / reprocessing a date range.
Args:
predicate: Predicate built with the :func:`~loom.etl.col` /
:data:`~loom.etl.params` DSL.
schema: Schema evolution strategy.
Returns:
New ``IntoTable`` with REPLACE_WHERE mode.
Example::
target = IntoTable("staging.orders").replace_where(
col("date").between(params.start_date, params.end_date)
)
"""
return self._with(
ReplaceWhereSpec(
table_ref=self._ref,
replace_predicate=predicate,
schema_mode=schema,
)
)
[docs]
def upsert(
self,
*,
keys: tuple[str, ...],
partition_cols: tuple[str, ...] = (),
exclude: tuple[str, ...] = (),
include: tuple[str, ...] = (),
schema: SchemaMode = SchemaMode.STRICT,
) -> IntoTable:
"""Write mode: merge rows using the given key columns (UPSERT / MERGE).
On first write the table is created from the frame. Subsequent writes
issue a Delta MERGE: matched rows are updated, unmatched rows are
inserted.
Declaring ``partition_cols`` is strongly recommended for large tables —
it allows Delta to prune files at the log level before evaluating the
join condition. Without it, every MERGE forces a full table scan.
Args:
keys: Columns that uniquely identify a row. Used in the
MERGE ``ON`` join condition.
partition_cols: Partition columns to include in the MERGE ``ON``
predicate and for Delta log pruning. Must be a
subset of the frame columns.
exclude: Columns to exclude from ``UPDATE SET`` on match.
Keys and partition columns are always excluded.
Mutually exclusive with *include*.
include: Explicit allow-list of columns to update on match.
Keys and partition columns are always excluded even
if listed here. Mutually exclusive with *exclude*.
schema: Schema evolution strategy. Defaults to
:attr:`~SchemaMode.STRICT`.
Returns:
New ``IntoTable`` with UPSERT mode.
Example::
target = IntoTable("events.orders").upsert(
keys=("order_id",),
partition_cols=("year", "month"),
exclude=("created_at",),
)
"""
return self._with(
UpsertSpec(
table_ref=self._ref,
upsert_keys=keys,
partition_cols=partition_cols,
upsert_exclude=exclude,
upsert_include=include,
schema_mode=schema,
)
)
def _with(self, spec: Any) -> IntoTable:
new = _clone_slots(self, IntoTable, IntoTable.__slots__)
object.__setattr__(new, "_spec", spec)
return new
def _to_spec(self) -> Any:
return self._spec
def __repr__(self) -> str:
mode = type(self._spec).__name__.removesuffix("Spec").lower()
return f"IntoTable({self._ref.ref!r}, mode={mode!r})"
[docs]
class IntoFile:
"""Declare a file as the ETL step target (CSV, JSON, XLSX, Parquet).
The ``path`` supports ``{field_name}`` template placeholders resolved
from params at runtime.
Args:
path: File path or template, e.g. ``"s3://exports/orders_{run_date}.csv"``.
format: :class:`~loom.etl.Format` of the output file.
Example::
target = IntoFile("s3://exports/summary_{run_date}.xlsx", format=Format.XLSX)
"""
__slots__ = ("_path", "_format", "_is_alias", "_write_options")
def __init__(self, path: str, *, format: Format) -> None:
self._path = path
self._format = format
self._is_alias = False
self._write_options: WriteOptions | None = None
[docs]
@classmethod
def alias(cls, name: str, *, format: Format) -> IntoFile:
"""Declare a file target by logical alias resolved from storage config.
The physical URI is looked up in ``storage.files`` at runtime via the
injected :class:`~loom.etl.storage.FileLocator`. Use this form when
the path is environment-specific and should not be hard-coded in the
pipeline.
Args:
name: Logical file alias matching a ``storage.files[].name``
entry in the config YAML.
format: :class:`~loom.etl.Format` of the output file.
Returns:
New ``IntoFile`` backed by a logical alias.
Example::
target = IntoFile.alias("exports_daily", format=Format.PARQUET)
"""
instance = cls.__new__(cls)
instance._path = name
instance._format = format
instance._is_alias = True
instance._write_options = None
return instance
[docs]
def with_options(self, options: WriteOptions) -> IntoFile:
"""Return a new ``IntoFile`` with format-specific write options.
Args:
options: Format-specific write options — use
:class:`~loom.etl.CsvWriteOptions` or
:class:`~loom.etl.ParquetWriteOptions`.
Returns:
New ``IntoFile`` with the options applied at write time.
Example::
target = IntoFile("s3://exports/report.csv", format=Format.CSV)
.with_options(CsvWriteOptions(separator=";"))
"""
new = _clone_slots(self, IntoFile, IntoFile.__slots__)
object.__setattr__(new, "_write_options", options)
return new
def _to_spec(self) -> Any:
return FileSpec(
path=self._path,
format=self._format,
is_alias=self._is_alias,
write_options=self._write_options,
)
def __repr__(self) -> str:
return f"IntoFile({self._path!r}, format={self._format!r})"
[docs]
class IntoTemp:
"""Declare an intermediate result that bypasses Delta and lives in tmp storage.
The physical format is chosen automatically by
:class:`~loom.etl.checkpoint.CheckpointStore` based on the DataFrame
type returned by ``execute()``:
* **Polars** — Arrow IPC via ``sink_ipc()`` (streaming write, no collect)
and ``scan_ipc()`` (lazy, memory-mapped read with predicate pushdown).
* **Spark** — Parquet directory via ``df.write.parquet()`` and
``spark.read.parquet()``. Cuts the lineage DAG; Photon-optimised.
Use :class:`~loom.etl.FromTemp` in a downstream step to consume the result.
Args:
name: Logical name identifying this intermediate. By default the
name must be unique across the pipeline — the compiler raises
if two steps write to the same name.
scope: Lifetime scope. Defaults to :attr:`~loom.etl.CheckpointScope.RUN`.
append: When ``True``, multiple steps may write to this name; their
outputs are concatenated and exposed as one logical intermediate
(fan-in pattern). All writers for a given name must agree on
the same ``append`` value — mixing ``True`` and ``False`` is a
compile-time error.
Example::
# strict — only one step may write "normalized_orders"
target = IntoTemp("normalized_orders")
# fan-in — multiple partition steps write to the same intermediate
target = IntoTemp("order_parts", append=True)
"""
__slots__ = ("_append", "_name", "_scope")
def __init__(
self,
name: str,
*,
scope: CheckpointScope = CheckpointScope.RUN,
append: bool = False,
) -> None:
self._name = name
self._scope = scope
self._append = append
@property
def temp_name(self) -> str:
"""Logical name of this intermediate."""
return self._name
@property
def scope(self) -> CheckpointScope:
"""Lifetime scope."""
return self._scope
@property
def append(self) -> bool:
"""Whether multiple steps may write to this name (fan-in)."""
return self._append
def _to_spec(self) -> Any:
if self._append:
return TempFanInSpec(temp_name=self._name, temp_scope=self._scope)
return TempSpec(temp_name=self._name, temp_scope=self._scope)
def __repr__(self) -> str:
return f"IntoTemp({self._name!r}, scope={self._scope!r}, append={self._append!r})"