Source code for loom.etl.declarative.source._from_clickhouse
"""ClickHouse source builder — FromClickHouse."""
from __future__ import annotations
from typing import Any
from loom.core.routing import LogicalRef
from loom.etl.declarative.expr._refs import TableRef
from loom.etl.declarative.source._specs import ClickHouseSourceSpec
from loom.etl.schema._contract import SchemaContract, resolve_schema
from loom.etl.schema._schema import ColumnSchema
[docs]
class FromClickHouse:
"""Declare a ClickHouse table or view as an ETL source.
The builder mirrors :class:`~loom.etl.declarative.source.FromTable` so it
fits the same authoring pattern:
* ``where(...)`` for predicates
* ``columns(...)`` for scan-time projection
* ``with_schema(...)`` for Loom schema contracts
* ``distinct()`` and ``unbounded()`` for query shaping / explicit scans
"""
__slots__ = (
"_ref",
"_predicates",
"_schema",
"_columns",
"_distinct",
"_allow_full_scan",
)
def __init__(self, ref: str | LogicalRef | TableRef) -> None:
self._ref: TableRef = ref if isinstance(ref, TableRef) else TableRef(ref)
self._predicates: tuple[Any, ...] = ()
self._schema: tuple[ColumnSchema, ...] = ()
self._columns: tuple[str, ...] = ()
self._distinct: bool = False
self._allow_full_scan: bool = False
@property
def table_ref(self) -> TableRef:
"""The ClickHouse table/view reference."""
return self._ref
@property
def predicates(self) -> tuple[Any, ...]:
"""Declared filter predicates."""
return self._predicates
[docs]
def with_schema(self, schema: SchemaContract) -> FromClickHouse:
"""Return a new source with a Loom schema contract attached."""
return self._clone(_schema=resolve_schema(schema))
[docs]
def where(self, *predicates: Any) -> FromClickHouse:
"""Return a new source with the given predicates appended."""
return self._clone(_predicates=self._predicates + predicates)
[docs]
def columns(self, *cols: str) -> FromClickHouse:
"""Return a new source projecting only the listed columns."""
if not cols:
raise ValueError("FromClickHouse.columns() requires at least one column name.")
return self._clone(_columns=cols)
[docs]
def select(self, columns: list[str]) -> FromClickHouse:
"""Compatibility alias for :meth:`columns`."""
return self.columns(*columns)
[docs]
def distinct(self) -> FromClickHouse:
"""Return a new source marked as DISTINCT."""
return self._clone(_distinct=True)
[docs]
def unbounded(self) -> FromClickHouse:
"""Opt into a full-table scan explicitly."""
return self._clone(_allow_full_scan=True)
def _to_spec(self, alias: str) -> ClickHouseSourceSpec:
"""Compile the declarative builder into a frozen source spec."""
if not self._predicates and not self._allow_full_scan:
raise ValueError(
f"FromClickHouse({self._ref.ref!r}) has no predicates. "
"Use .where() to filter rows or .unbounded() for an explicit full scan."
)
return ClickHouseSourceSpec(
alias=alias,
table_ref=self._ref,
predicates=self._predicates,
columns=self._columns,
schema=self._schema,
distinct=self._distinct,
allow_full_scan=self._allow_full_scan,
)
def _clone(self, **overrides: Any) -> FromClickHouse:
new = object.__new__(FromClickHouse)
for slot in self.__slots__:
object.__setattr__(new, slot, overrides.get(slot, getattr(self, slot)))
return new
def __repr__(self) -> str:
return f"FromClickHouse({self._ref.ref!r})"
__all__ = ["FromClickHouse"]