Source code for loom.streaming.nodes._table.common

"""IntoTable streaming sink node — writes typed events to Delta or SQLAlchemy tables.

IntoTable is a frozen dataclass that implements IntoSink structurally.
The compiler detects it via isinstance(node, IntoSink) without any
registration or inheritance.

Supported backends
------------------
- ``Backend.SQLALCHEMY``: bulk-inserts rows using SQLAlchemy async sessions.
  Requires an async bridge and a shared SQLAlchemy session manager from the
  Bytewax runtime.
  Install: ``pip install sqlalchemy``.
- ``Backend.DELTA``: stages rows as parquet parts in temporary files and
  appends them to Delta via delta-rs.
  Requires a ``uri`` key in the resolved config section.
  Install: ``pip install deltalake polars``.

Config example (streaming.sinks.<name>)::

    database:
      warehouse:
        url: postgresql+asyncpg://user:pass@host/db
        pool_pre_ping: true
        pool_size: 10

    streaming:
      sinks:
        orders_sink:
          database: warehouse
          table: orders
          chunk_size: 500

        events_sink:
          uri: s3://data-lake/events
          mode: append            # delta write mode (default: append)

SQLAlchemy example::

    from loom.core.config import ConfigContext
    from loom.core.repository.sqlalchemy.session_manager import SessionManager
    from loom.streaming.nodes._table import SqlAlchemyDatabaseConfig, SqlAlchemySinkConfig

    ctx = ConfigContext.from_yaml("config.yaml")
    sink_cfg = SqlAlchemySinkConfig.from_config(
        ctx.section("streaming.sinks.orders_sink", dict),
        default_table="orders",
    )
    db_cfg = SqlAlchemyDatabaseConfig.from_config(
        ctx.section(f"database.{sink_cfg.database}", dict),
    )
    session_manager = SessionManager.from_config(db_cfg.to_session_manager_config())

Delta example::

    from loom.core.config import ConfigContext
    from loom.streaming.nodes._table import DeltaSinkConfig

    ctx = ConfigContext.from_yaml("config.yaml")
    sink_cfg = DeltaSinkConfig.from_config(
        ctx.section("streaming.sinks.events_sink", dict),
        default_table="events",
    )
"""

from __future__ import annotations

import json
from collections import deque
from collections.abc import Mapping, Sequence
from contextlib import AbstractAsyncContextManager, suppress
from dataclasses import dataclass, field
from datetime import date, datetime, time
from decimal import Decimal
from enum import StrEnum
from tempfile import SpooledTemporaryFile
from time import monotonic
from types import UnionType
from typing import (
    Annotated,
    Any,
    ClassVar,
    Generic,
    Literal,
    Protocol,
    Self,
    TypeVar,
    Union,
    cast,
    get_args,
    get_origin,
    get_type_hints,
)
from uuid import uuid4

import msgspec
import msgspec.structs

from loom.core.async_bridge import AsyncBridge
from loom.core.model.types import _JsonMarker
from loom.streaming.core._message import StreamPayload
from loom.streaming.nodes._sink import SinkPartition

# ── optional SQLAlchemy dependency ────────────────────────────────────────────
# Imported once at module level. _SA_AVAILABLE stays False when sqlalchemy is
# not installed; every class and helper below can reference these names safely
# because from __future__ import annotations makes all annotations lazy strings.
try:
    import sqlalchemy as _sa
    from sqlalchemy import insert as _sa_insert

    Boolean = _sa.Boolean
    Column = _sa.Column
    DateTime = _sa.DateTime
    Integer = _sa.Integer
    MetaData = _sa.MetaData
    Numeric = _sa.Numeric
    SAJSON = _sa.JSON
    SAFloat = _sa.Float
    String = _sa.String
    Table = _sa.Table
    Text = _sa.Text

    # Defined inside the try block so all SA names are provably bound here.
    _SA_TYPE_MAP: dict[type, Any] = {
        int: Integer(),
        float: SAFloat(),
        bool: Boolean(),
        str: String(),
        datetime: DateTime(timezone=True),
        date: String(10),
        time: String(8),
        Decimal: Numeric(),
        bytes: Text(),
    }
    _SA_AVAILABLE = True
except ImportError:
    _sa_insert = cast(Any, None)
    _SA_TYPE_MAP = {}
    _SA_AVAILABLE = False

# ── optional Delta dependency ─────────────────────────────────────────────────
# Fallback assignments in the except branch ensure both names are always bound,
# removing pyright's "possibly unbound" diagnostic on write_batch call sites.
try:
    import polars as pl
    import pyarrow as _pa
    import pyarrow.ipc as _pa_ipc
    from deltalake.writer import WriterProperties, write_deltalake

    _writer_properties_factory = WriterProperties
    _DELTA_AVAILABLE = True
except ImportError:
    pl = cast(Any, None)
    _pa = cast(Any, None)
    _pa_ipc = cast(Any, None)
    _writer_properties_factory = cast(Any, None)
    write_deltalake = cast(Any, None)
    _DELTA_AVAILABLE = False

# ── optional ClickHouse dependency ────────────────────────────────────────────
try:
    import clickhouse_connect as _cc  # type: ignore[import-untyped]

    _CC_AVAILABLE = True
except ImportError:
    _cc = cast(Any, None)
    _CC_AVAILABLE = False

EventT = TypeVar("EventT", bound=StreamPayload)


class _SessionManagerLike(Protocol):
    """Minimal async session manager contract required by IntoTable."""

    def session(self) -> AbstractAsyncContextManager[Any]:
        """Return an async context manager yielding one SQLAlchemy session."""
        ...


_DELTA_MODES: frozenset[str] = frozenset({"error", "append", "ignore"})
_DEFAULT_BUFFER_MAX_RECORDS = 50_000
_DEFAULT_BUFFER_MAX_BYTES = 128 * 1024 * 1024
_DEFAULT_BUFFER_MAX_AGE_S = 60.0
_DEFAULT_FLUSH_ON_SHUTDOWN = True


[docs] class Backend(StrEnum): """Storage backend used by :class:`IntoTable` to write epoch batches. Args: SQLALCHEMY: Bulk-insert rows via SQLAlchemy Core. Transport-agnostic — works with any SA-supported database. DELTA: Stage parquet parts and append them to a Delta Lake table. Requires ``deltalake`` and ``polars`` to be installed. CLICKHOUSE: Bulk-insert rows via ``clickhouse-connect`` (native HTTP/2 protocol, no SQLAlchemy). Requires ``clickhouse-connect`` to be installed. Supports ``Array``, ``Nullable`` and all ClickHouse-native types without JSON serialisation workarounds. """ SQLALCHEMY = "sqlalchemy" DELTA = "delta" CLICKHOUSE = "clickhouse"
@dataclass(frozen=True, slots=True) class _TableBufferPolicy: """Common buffering policy shared by all IntoTable backends.""" max_records: int = _DEFAULT_BUFFER_MAX_RECORDS max_bytes: int = _DEFAULT_BUFFER_MAX_BYTES max_age_s: float = _DEFAULT_BUFFER_MAX_AGE_S flush_on_shutdown: bool = _DEFAULT_FLUSH_ON_SHUTDOWN @classmethod def from_config(cls, config: Mapping[str, Any]) -> _TableBufferPolicy: """Build a buffering policy from a resolved sink config.""" return cls( max_records=_optional_int( config.get("buffer_max_records"), _DEFAULT_BUFFER_MAX_RECORDS, ), max_bytes=_optional_int(config.get("buffer_max_bytes"), _DEFAULT_BUFFER_MAX_BYTES), max_age_s=float(config.get("buffer_max_age_s", _DEFAULT_BUFFER_MAX_AGE_S)), flush_on_shutdown=bool(config.get("flush_on_shutdown", _DEFAULT_FLUSH_ON_SHUTDOWN)), )
[docs] @dataclass(frozen=True) class IntoTable(Generic[EventT]): """Streaming terminal node that writes typed events to a storage table. Implements :class:`~loom.streaming.nodes.IntoSink` structurally — no base class or registration required. The compiler detects it via structural ``isinstance(node, IntoSink)`` and resolves its config from ``streaming.sinks.<name>``. Args: payload: Concrete event type written by this sink. table: Target table name. May be overridden by the resolved config section or left empty when the config supplies it. backend: Storage backend. Defaults to :attr:`Backend.SQLALCHEMY`. name: Config section key (``streaming.sinks.<name>``). Required when the sink is resolved from ``ConfigContext``. router_branch_safe: Always ``True`` — IntoTable may appear inside Router branches. Example:: Process( Decompose(StoreEventExpander), Router({ StoreRow: Process( IntoTable(payload=StoreRow, table="store", name="store_sink") ), LanguageRow: Process( IntoTable(payload=LanguageRow, table="language", name="lang_sink") ), }), ) """ payload: type[EventT] table: str = "" backend: Backend = Backend.SQLALCHEMY name: str = "" router_branch_safe: ClassVar[bool] = True
[docs] def build_partition( self, config: SqlAlchemySinkConfig | DeltaSinkConfig | ClickHouseSinkConfig, worker_index: int, worker_count: int, bridge: AsyncBridge | None = None, session_manager: _SessionManagerLike | None = None, logger: Any = None, ) -> SinkPartition[EventT]: """Build the per-worker partition for this sink. Args: config: Resolved ``streaming.sinks.<name>`` config section. worker_index: Zero-based index of the calling worker (unused). worker_count: Total number of workers (unused). bridge: Optional adapter-owned async bridge for SQLAlchemy sinks. session_manager: Optional adapter-owned SQLAlchemy session manager. Returns: A :class:`~loom.streaming.nodes.SinkPartition` ready to receive ``write_batch`` and ``close`` calls. Raises: TypeError: If the provided config does not match the selected backend. RuntimeError: If required runtime resources are missing. ValueError: If the resolved backend is not supported. """ del worker_index, worker_count if self.backend is Backend.SQLALCHEMY: if not isinstance(config, SqlAlchemySinkConfig): raise TypeError("IntoTable backend=sqlalchemy requires a SqlAlchemySinkConfig.") if bridge is None: raise RuntimeError( "IntoTable backend=sqlalchemy requires an AsyncBridge from the Bytewax runtime." ) if session_manager is None: raise RuntimeError( "IntoTable backend=sqlalchemy requires a shared " "SessionManager from the Bytewax runtime." ) return cast( SinkPartition[EventT], _SQLAlchemyTablePartition( config, self.payload, bridge, session_manager, logger=logger, ), ) if self.backend is Backend.DELTA: if not isinstance(config, DeltaSinkConfig): raise TypeError("IntoTable backend=delta requires a DeltaSinkConfig.") return cast( SinkPartition[EventT], _DeltaTablePartition(config, logger=logger), ) if self.backend is Backend.CLICKHOUSE: if not isinstance(config, ClickHouseSinkConfig): raise TypeError("IntoTable backend=clickhouse requires a ClickHouseSinkConfig.") return cast( SinkPartition[EventT], _ClickHouseTablePartition(config, self.payload, logger=logger), ) raise ValueError(f"Unsupported backend: {self.backend!r}")
# ── SQLAlchemy helpers ──────────────────────────────────────────────────────── # These functions reference module-level SA names (Column, Integer, …). # They are only called after _SA_AVAILABLE is confirmed in __init__, so # NameError cannot occur — no import statements appear inside them. def _optional_int(value: Any, default: int) -> int: if value is None: return default return int(value)
[docs] @dataclass(frozen=True, slots=True) class SqlAlchemyDatabaseConfig: """Resolved SQLAlchemy connection settings for a shared database entry. Args: url: SQLAlchemy async URL. echo: Enable SQL echo logging. pool_pre_ping: Check pooled connections before checkout. pool_size: Base connection pool size. max_overflow: Additional overflow connections. pool_timeout: Seconds to wait for a pooled connection. pool_recycle: Seconds before recycling pooled connections. connect_args: Extra arguments passed to the DBAPI connect call. """ url: str echo: bool = False pool_pre_ping: bool = True pool_size: int = 10 max_overflow: int = 20 pool_timeout: int = 30 pool_recycle: int = 1800 connect_args: dict[str, object] = field(default_factory=dict)
[docs] @classmethod def from_config(cls, config: Mapping[str, Any]) -> Self: """Build a SQLAlchemy connection config from a resolved YAML section.""" url = str(config.get("url") or "").strip() if not url: raise ValueError("SQLAlchemy database config requires a non-empty 'url'.") return cls( url=url, echo=bool(config.get("echo", False)), pool_pre_ping=bool(config.get("pool_pre_ping", True)), pool_size=_optional_int(config.get("pool_size"), 10), max_overflow=_optional_int(config.get("max_overflow"), 20), pool_timeout=_optional_int(config.get("pool_timeout"), 30), pool_recycle=_optional_int(config.get("pool_recycle"), 1800), connect_args=dict(config.get("connect_args") or {}), )
[docs] def to_session_manager_config(self) -> dict[str, object]: """Convert the config to ``SessionManager.from_config`` keyword mapping.""" return { "url": self.url, "echo": self.echo, "pool_pre_ping": self.pool_pre_ping, "pool_size": self.pool_size, "max_overflow": self.max_overflow, "pool_timeout": self.pool_timeout, "pool_recycle": self.pool_recycle, "connect_args": dict(self.connect_args), }
[docs] @dataclass(frozen=True, slots=True) class SqlAlchemySinkConfig: """Resolved ``IntoTable`` settings for the SQLAlchemy backend. Args: database: Shared ``database.<name>`` reference. Empty when using inline ``url`` resolution. url: Inline SQLAlchemy URL fallback. table: Final target table name. chunk_size: Number of rows per bulk ``executemany`` call. buffer_policy: Common flush policy shared with other backends. """ database: str url: str table: str chunk_size: int buffer_policy: _TableBufferPolicy
[docs] @classmethod def from_config(cls, config: Mapping[str, Any], *, default_table: str) -> Self: """Build SQLAlchemy sink settings from a resolved sink config.""" table = str(config.get("table") or default_table).strip() database = str(config.get("database") or "").strip() url = str(config.get("url") or "").strip() if not table: raise ValueError("SQLAlchemy sink config requires a non-empty table name.") return cls( database=database, url=url, table=table, chunk_size=_optional_int(config.get("chunk_size"), 500), buffer_policy=_TableBufferPolicy.from_config(config), )
[docs] @dataclass(frozen=True, slots=True) class DeltaSinkConfig: """Resolved ``IntoTable`` settings for the Delta backend. Args: uri: Delta table URI. table: Final target table name. mode: delta-rs write mode. storage_options: Object-store credentials/options. partition_by: Column names used to partition the Delta table. target_file_size: Optional delta-rs target file size in bytes. staging_compression: Compression codec for the IPC staging stream. spool_max_bytes: Maximum in-memory bytes before spooling the staging file to disk. part_max_records: Backward-compatible row-group hint for the final Delta write. writer_properties: Final Delta writer properties passed to delta-rs. buffer_policy: Common flush policy shared with other backends. """ uri: str table: str mode: Literal["error", "append", "ignore"] storage_options: dict[str, str] partition_by: tuple[str, ...] target_file_size: int | None staging_compression: Literal["uncompressed", "lz4", "zstd"] spool_max_bytes: int part_max_records: int writer_properties: Any | None buffer_policy: _TableBufferPolicy mini_batch_size: int = 500
[docs] @classmethod def from_config(cls, config: Mapping[str, Any], *, default_table: str) -> Self: """Build Delta sink settings from a resolved sink config.""" table = str(config.get("table") or default_table).strip() uri = str(config.get("uri") or "").strip() if not uri: raise ValueError("Delta sink config requires a non-empty 'uri'.") raw_partition_by = config.get("partition_by") or [] staging_config = config.get("staging") if isinstance(config.get("staging"), Mapping) else {} staging_compression = _validate_ipc_compression( staging_config.get("compression") if isinstance(staging_config, Mapping) else config.get("staging_compression", "uncompressed") ) writer_properties = _build_writer_properties( config.get("writer_properties") if isinstance(config.get("writer_properties"), Mapping) else {} ) if writer_properties is None: writer_properties = _writer_properties_factory( max_row_group_size=_optional_int(config.get("part_max_records"), 10_000) ) target_file_size_raw = config.get("target_file_size") return cls( uri=uri, table=table, mode=_validate_delta_mode(config.get("mode", "append")), storage_options={k: str(v) for k, v in (config.get("storage_options") or {}).items()}, partition_by=tuple(str(c) for c in raw_partition_by), target_file_size=int(target_file_size_raw) if target_file_size_raw is not None else None, staging_compression=staging_compression, spool_max_bytes=_optional_int(config.get("spool_max_bytes"), 32 * 1024 * 1024), part_max_records=_optional_int(config.get("part_max_records"), 10_000), writer_properties=writer_properties, buffer_policy=_TableBufferPolicy.from_config(config), mini_batch_size=_optional_int( staging_config.get("mini_batch_size") if isinstance(staging_config, Mapping) else config.get("mini_batch_size"), 500, ), )
@dataclass(frozen=True, slots=True) class ClickHouseSinkConfig: """Resolved ``IntoTable`` settings for the ClickHouse backend. Connection is established via ``clickhouse-connect`` using a standard ``clickhouse://`` DSN derived from the ``database.<name>.url`` entry in ``config.yaml``. The ``clickhouse+asynch://`` scheme used by ``clickhouse-sqlalchemy`` is normalised automatically — no config changes are required. Args: url: Normalised ``clickhouse://`` DSN for ``clickhouse-connect``. table: Target table name. buffer_policy: Common flush policy (max_records, max_bytes, max_age_s). insert_timeout: Seconds before a single insert call is aborted. ``None`` disables the timeout (default). """ url: str table: str buffer_policy: _TableBufferPolicy insert_timeout: float | None = None @classmethod def from_config(cls, config: Mapping[str, Any], *, default_table: str) -> Self: """Build ClickHouse sink settings from a resolved sink + database config.""" table = str(config.get("table") or default_table).strip() url = str(config.get("url") or "").strip() if not table: raise ValueError("ClickHouse sink config requires a non-empty table name.") if not url: raise ValueError("ClickHouse sink config requires a 'url' in the database section.") timeout_raw = config.get("insert_timeout") return cls( url=url, table=table, buffer_policy=_TableBufferPolicy.from_config(config), insert_timeout=float(timeout_raw) if timeout_raw is not None else None, ) def _validate_ipc_compression(value: object) -> Literal["uncompressed", "lz4", "zstd"]: compression = str(value or "uncompressed").strip().lower() if compression not in {"uncompressed", "lz4", "zstd"}: raise ValueError( "Invalid IPC compression " f"{compression!r}. Must be one of: ['lz4', 'uncompressed', 'zstd']" ) return cast(Literal["uncompressed", "lz4", "zstd"], compression) def _build_writer_properties(config: Mapping[str, Any] | None) -> Any | None: if not config: return None kwargs = dict(config) compression = kwargs.get("compression") if isinstance(compression, str): kwargs["compression"] = compression.upper() return _writer_properties_factory(**kwargs) def _sa_type_for(annotation: Any) -> Any: """Map a Python type annotation to a SQLAlchemy column type instance. Unwraps ``Optional[T]`` before mapping. Collections and unmapped types fall back to ``JSON()``. References module-level ``_SA_TYPE_MAP`` — no import statements inside. Args: annotation: A Python type or ``Optional`` / union variant. Returns: A SQLAlchemy column type instance. """ origin = get_origin(annotation) if origin is Annotated: base_type, *metadata = get_args(annotation) if any(isinstance(m, _JsonMarker) for m in metadata): return Text() return _sa_type_for(base_type) if origin in (Union, UnionType): non_none = [a for a in get_args(annotation) if a is not type(None)] return _sa_type_for(non_none[0]) if len(non_none) == 1 else SAJSON() if origin in (list, tuple, set, dict, frozenset): return SAJSON() if isinstance(annotation, type): return _SA_TYPE_MAP.get(annotation, SAJSON()) return SAJSON() def _sa_table_from_struct(table_name: str, payload_type: type[Any]) -> Table: """Build a SQLAlchemy Core ``Table`` from a msgspec Struct type. Resolves annotations via :func:`~typing.get_type_hints` and maps each Python type to the corresponding SA column type via :func:`_sa_type_for`. References module-level ``Column``, ``MetaData``, ``Table`` — no imports inside. Args: table_name: Target table name used in the SQL DDL. payload_type: A :class:`~loom.core.model.struct.LoomStruct` or :class:`~loom.core.model.struct.LoomFrozenStruct` subclass whose fields define the column layout. Returns: A :class:`~sqlalchemy.Table` bound to a fresh :class:`~sqlalchemy.MetaData`. """ hints = get_type_hints(payload_type) columns = [ Column(field.name, _sa_type_for(hints.get(field.name, object))) for field in msgspec.structs.fields(payload_type) ] return Table(table_name, MetaData(), *columns) def _row_size_bytes(row: Mapping[str, Any]) -> int: """Estimate the serialized size of one row for buffer accounting.""" return len(msgspec.json.encode(dict(row))) class _SQLAlchemyTablePartition: """Bulk-insert epoch batches into a relational table via SQLAlchemy async sessions. The partition reuses a shared SessionManager owned by the Bytewax adapter and uses the Bytewax async bridge to execute batch inserts without exposing event-loop management to callers. Column types are inferred from the msgspec Struct field annotations — no ORM mapping or reflection required. Args: settings: Resolved SQLAlchemy sink config. payload_type: Struct type whose fields map to table columns. bridge: Bytewax async bridge used to run async writes. session_manager: Shared SQLAlchemy session manager for this DB config. Raises: ImportError: If ``sqlalchemy`` is not installed. """ def __init__( self, settings: SqlAlchemySinkConfig, payload_type: type[Any], bridge: AsyncBridge, session_manager: _SessionManagerLike, logger: Any = None, ) -> None: if not _SA_AVAILABLE: raise ImportError( "SQLAlchemy backend requires 'sqlalchemy'. Install it with: pip install sqlalchemy" ) self._bridge = bridge self._session_manager = session_manager self._logger = logger self._table = _sa_table_from_struct(settings.table, payload_type) self._chunk_size = settings.chunk_size self._buffer_policy = settings.buffer_policy self._buffer: deque[tuple[dict[str, Any], int]] = deque() self._buffer_records = 0 self._buffer_bytes = 0 self._buffer_started_at: float | None = None self._closed = False def write_batch(self, items: Sequence[Any]) -> None: """Bulk-insert one epoch batch. Args: items: Struct instances to insert. Empty batches still check for age-based flushes so stale buffers drain on quiet epochs. """ if not items: if self._buffer and self._should_flush(): self._bridge.run(self._flush_async()) return for item in items: row = _to_row_dict(item) row_bytes = _row_size_bytes(row) if not self._buffer: self._buffer_started_at = monotonic() self._buffer.append((row, row_bytes)) self._buffer_records += 1 self._buffer_bytes += row_bytes if self._should_flush(): self._bridge.run(self._flush_async()) def close(self) -> None: """Release partition resources. The shared session manager is owned by the adapter runtime and disposed centrally when the Bytewax worker shuts down. """ if self._closed: return self._closed = True if self._buffer and self._buffer_policy.flush_on_shutdown: self._bridge.run(self._flush_async()) def _should_flush(self) -> bool: if self._closed or not self._buffer: return False if self._buffer_records >= self._buffer_policy.max_records: return True if self._buffer_bytes >= self._buffer_policy.max_bytes: return True if self._buffer_started_at is None: return False return monotonic() - self._buffer_started_at >= self._buffer_policy.max_age_s async def _flush_async(self) -> None: statement = _sa_insert(self._table) trace_id = uuid4().hex correlation_id = uuid4().hex async with self._session_manager.session() as session: try: n = 0 t0 = monotonic() while self._buffer: rows: list[dict[str, Any]] = [] while self._buffer and len(rows) < self._chunk_size: row, row_bytes = self._buffer.popleft() rows.append(row) self._buffer_records -= 1 self._buffer_bytes -= row_bytes if rows: await session.execute(statement, rows) n += len(rows) await session.commit() if self._logger is not None: self._logger.info( "sink_flush", trace_id=trace_id, correlation_id=correlation_id, backend="sqlalchemy", table=self._table.name, records=n, duration_ms=round((monotonic() - t0) * 1000, 1), ) except Exception: await session.rollback() raise self._buffer_started_at = None def _to_row_dict(item: Any) -> dict[str, Any]: """Serialise one LoomStruct instance to a plain dict.""" return msgspec.structs.asdict(cast(msgspec.Struct, item)) # ── Delta helpers ───────────────────────────────────────────────────────────── def _validate_delta_mode(value: object) -> Literal["error", "append", "ignore"]: mode = str(value) if mode not in _DELTA_MODES: raise ValueError( f"Invalid Delta write mode {mode!r}. Must be one of: {sorted(_DELTA_MODES)}" ) return mode # type: ignore[return-value] class _DeltaTablePartition: """Stage rows as a PyArrow IPC stream and append them to Delta. Rows accumulate in a plain ``list[dict]``; every ``mini_batch_size`` rows they are converted to a single ``pa.RecordBatch`` and written to a ``SpooledTemporaryFile``-backed IPC stream. At flush the stream is closed, rewound, and passed directly to ``write_deltalake`` as a ``RecordBatchReader`` — delta-rs iterates the batches without collecting them into a single in-memory frame. Args: settings: Resolved Delta sink config. Raises: ImportError: If ``deltalake``, ``polars``, or ``pyarrow`` are not installed. """ def __init__( self, settings: DeltaSinkConfig, logger: Any = None, ) -> None: if not _DELTA_AVAILABLE: raise ImportError( "Delta backend requires 'deltalake', 'polars' and 'pyarrow'. " "Install them with: pip install deltalake polars pyarrow" ) self._logger = logger self._uri = settings.uri self._mode = settings.mode self._storage_options = dict(settings.storage_options) self._partition_by = settings.partition_by self._buffer_policy = settings.buffer_policy self._spool_max_bytes = settings.spool_max_bytes self._target_file_size = settings.target_file_size self._staging_compression = settings.staging_compression self._writer_properties = settings.writer_properties self._mini_batch_size = settings.mini_batch_size self._mini_batch: list[dict[str, Any]] = [] self._ipc_writer: Any = None # pa.ipc.RecordBatchStreamWriter | None self._spool: Any = self._new_spool() self._staged_records = 0 self._staged_bytes = 0 self._buffer_started_at: float | None = None self._closed = False def _new_spool(self) -> Any: return SpooledTemporaryFile(max_size=self._spool_max_bytes) # noqa: SIM115 def _ipc_compression(self) -> Any: if self._staging_compression == "uncompressed": return None return self._staging_compression # "lz4" | "zstd" def write_batch(self, items: Sequence[Any]) -> None: """Accumulate items into mini-batches and flush when a threshold is reached. Args: items: Struct instances to write. Empty batches still check for age-based flushes so stale buffers drain on quiet epochs. """ if not items: if self._staged_records and self._should_flush(): self._flush() return for item in items: if self._staged_records == 0: self._buffer_started_at = monotonic() self._stage_item(item) if self._should_flush(): self._flush() def _stage_item(self, item: Any) -> None: row = msgspec.structs.asdict(cast(msgspec.Struct, item)) if not row: return self._mini_batch.append(row) self._staged_records += 1 if len(self._mini_batch) >= self._mini_batch_size: self._commit_mini_batch() def _commit_mini_batch(self) -> None: if not self._mini_batch: return batch = _pa.RecordBatch.from_pylist(self._mini_batch) if self._ipc_writer is None: options = _pa_ipc.IpcWriteOptions(compression=self._ipc_compression()) self._ipc_writer = _pa_ipc.new_stream(self._spool, batch.schema, options=options) if self._logger is not None: self._logger.debug( "delta_staging_stream_opened", uri=self._uri, schema_fields=len(batch.schema), compression=self._staging_compression, ) self._ipc_writer.write_batch(batch) pos = self._spool.tell() self._staged_bytes = pos if self._logger is not None: self._logger.debug( "delta_mini_batch_written", uri=self._uri, batch_rows=len(self._mini_batch), spool_bytes=pos, ) self._mini_batch.clear() def _should_flush(self) -> bool: if self._closed or not self._staged_records: return False if self._staged_records >= self._buffer_policy.max_records: return True if self._staged_bytes >= self._buffer_policy.max_bytes: if self._logger is not None: self._logger.warning( "delta_staging_max_bytes_reached", uri=self._uri, staged_bytes=self._staged_bytes, max_bytes=self._buffer_policy.max_bytes, ) return True if self._buffer_started_at is None: return False return monotonic() - self._buffer_started_at >= self._buffer_policy.max_age_s def _flush(self) -> None: self._commit_mini_batch() writer = self._ipc_writer if writer is None: return n = self._staged_records spool = self._spool self._spool = self._new_spool() self._ipc_writer = None self._staged_records = 0 self._staged_bytes = 0 self._buffer_started_at = None trace_id = uuid4().hex correlation_id = uuid4().hex t0 = monotonic() try: writer.close() spool.seek(0) reader = _pa_ipc.open_stream(spool) self._write_to_delta(reader, n, trace_id, correlation_id, t0) except Exception: if self._logger is not None: self._logger.error( "sink_flush_error", backend="delta", uri=self._uri, records=n, ) raise finally: spool.close() def _write_to_delta( self, reader: Any, n: int, trace_id: str, correlation_id: str, t0: float, ) -> None: kwargs: dict[str, Any] = { "mode": self._mode, "storage_options": self._storage_options or None, } if self._partition_by: kwargs["partition_by"] = list(self._partition_by) if self._target_file_size is not None: kwargs["target_file_size"] = self._target_file_size if self._writer_properties is not None: kwargs["writer_properties"] = self._writer_properties if self._logger is not None: self._logger.debug( "delta_write_start", uri=self._uri, records=n, mode=self._mode, ) write_deltalake(self._uri, reader, **kwargs) if self._logger is not None: self._logger.info( "sink_flush", trace_id=trace_id, correlation_id=correlation_id, backend="delta", uri=self._uri, records=n, duration_ms=round((monotonic() - t0) * 1000, 1), ) def close(self) -> None: """Flush staged records if configured and release all resources.""" if self._closed: return self._closed = True try: if self._staged_records and self._buffer_policy.flush_on_shutdown: self._flush() elif self._staged_records and self._logger is not None: self._logger.warning( "delta_staged_records_discarded_on_close", uri=self._uri, records=self._staged_records, ) finally: if self._ipc_writer is not None: with suppress(Exception): self._ipc_writer.close() self._spool.close() # ── ClickHouse helpers ──────────────────────────────────────────────────────── def _column_names_from_struct(payload_type: type[Any]) -> list[str]: """Return field names for a msgspec Struct in declaration order.""" return [f.name for f in msgspec.structs.fields(payload_type)] def _to_ch_row(row: Mapping[str, Any], column_names: list[str]) -> list[Any]: """Coerce one row dict to an ordered list for ``clickhouse-connect`` insert. Applies the minimal coercions that ClickHouse native types require: - ``bool`` → ``int`` (ClickHouse ``UInt8`` expects 0/1, not ``True``/``False``) - ``date`` → ISO-8601 string (``clickhouse-connect`` handles ``datetime`` natively) - ``dict`` / ``frozenset`` → ``json.dumps`` (no universal Map type in CH) All other Python types (``str``, ``int``, ``float``, ``datetime``, ``list``, ``None``) are passed through unchanged. """ result: list[Any] = [] for name in column_names: value = row[name] if isinstance(value, bool): result.append(int(value)) elif isinstance(value, date) and not isinstance(value, datetime): result.append(value.isoformat()) elif isinstance(value, (dict, frozenset)): result.append(json.dumps(value, ensure_ascii=False, sort_keys=True)) else: result.append(value) return result class _ClickHouseTablePartition: """Bulk-insert epoch batches into ClickHouse via ``clickhouse-connect``. Uses the native HTTP/2 client — no SQLAlchemy, no async bridge required. The buffer and flush policy mirror :class:`_DeltaTablePartition`: data accumulates in memory and is flushed as a single ``INSERT`` when any of the configured thresholds is reached. **Column-oriented buffer layout** — the internal buffer is a list of *column* lists rather than a list of *row* lists:: buffer = [ [col0_row0, col0_row1, ...], # one list per column [col1_row0, col1_row1, ...], ... ] This matches the ``column_oriented=True`` path in ``clickhouse-connect``, which skips the internal ``pivot()`` transposition and sends the data directly when the batch fits in a single network block (the common case). ``list[str]`` and other collection fields are handled by the native ClickHouse protocol without JSON serialisation workarounds. Args: settings: Resolved ClickHouse sink config. payload_type: Struct type whose fields define the column layout. Raises: ImportError: If ``clickhouse-connect`` is not installed. """ def __init__( self, settings: ClickHouseSinkConfig, payload_type: type[Any], logger: Any = None, ) -> None: if not _CC_AVAILABLE: raise ImportError( "ClickHouse backend requires 'clickhouse-connect'. " "Install it with: pip install clickhouse-connect" ) kwargs: dict[str, Any] = {} if settings.insert_timeout is not None: t = int(settings.insert_timeout) kwargs = {"connect_timeout": t, "send_receive_timeout": t} self._client = _cc.get_client(dsn=settings.url, **kwargs) self._logger = logger self._table = settings.table self._column_names = _column_names_from_struct(payload_type) self._buffer_policy = settings.buffer_policy # Column-oriented: one inner list per column, grows with each row. self._buffer: list[list[Any]] = [[] for _ in self._column_names] self._buffer_records = 0 self._buffer_bytes = 0 self._buffer_started_at: float | None = None self._closed = False def write_batch(self, items: Sequence[Any]) -> None: """Buffer items and flush to ClickHouse when a threshold is reached. Args: items: Struct instances to write. Empty batches still check for age-based flushes so stale buffers drain on quiet epochs. """ if not items: if self._buffer_records and self._should_flush(): self._flush() return for item in items: if not self._buffer_records: self._buffer_started_at = monotonic() row = _to_ch_row(_to_row_dict(item), self._column_names) row_bytes = sum(len(str(v)) for v in row) for col_idx, value in enumerate(row): self._buffer[col_idx].append(value) self._buffer_records += 1 self._buffer_bytes += row_bytes if self._should_flush(): self._flush() def close(self) -> None: """Flush remaining buffer and close the ClickHouse client.""" if self._closed: return self._closed = True try: if self._buffer_records and self._buffer_policy.flush_on_shutdown: self._flush() finally: self._client.close() def _should_flush(self) -> bool: if self._closed or not self._buffer_records: return False if self._buffer_records >= self._buffer_policy.max_records: return True if self._buffer_bytes >= self._buffer_policy.max_bytes: return True if self._buffer_started_at is None: return False return monotonic() - self._buffer_started_at >= self._buffer_policy.max_age_s def _flush(self) -> None: if not self._buffer_records: return n = self._buffer_records columns = self._buffer trace_id = uuid4().hex correlation_id = uuid4().hex t0 = monotonic() try: self._client.insert( self._table, columns, column_names=self._column_names, column_oriented=True, ) except Exception: if self._logger is not None: self._logger.error( "sink_flush_error", backend="clickhouse", table=self._table, records=n, ) raise self._buffer = [[] for _ in self._column_names] self._buffer_records = 0 self._buffer_bytes = 0 self._buffer_started_at = None if self._logger is not None: self._logger.info( "sink_flush", trace_id=trace_id, correlation_id=correlation_id, backend="clickhouse", table=self._table, records=n, duration_ms=round((monotonic() - t0) * 1000, 1), ) __all__ = [ "Backend", "ClickHouseSinkConfig", "DeltaSinkConfig", "IntoTable", "SqlAlchemyDatabaseConfig", "SqlAlchemySinkConfig", ]