Source code for loom.streaming.bytewax._sink_registry

"""Sink registry for StreamingRunner.

Defines the RegisteredSink protocol, RuntimeSinkBinding struct, and SinkRegistry
that the runner uses to resolve sinks from YAML config at startup.
"""

from __future__ import annotations

from typing import Any, ClassVar, Literal, Protocol, runtime_checkable

from loom.core.config import ConfigContext
from loom.core.model import LoomFrozenStruct
from loom.streaming.core._errors import ErrorKind
from loom.streaming.core._exceptions import DuplicateErrorSinkError


[docs] @runtime_checkable class RegisteredSink(Protocol): """Protocol a registrable sink class must satisfy. Attributes: sink_type: Unique type identifier. Must match the ``type`` field in the YAML config. config_type: msgspec Struct type used to deserialize the YAML config section. Example:: class ClickHouseErrorTableSink: sink_type = "clickhouse_error_table" config_type = ClickHouseErrorTableSinkConfig @classmethod def build_binding(cls, cfg, ctx): ... return RuntimeSinkBinding(purpose="errors", sink=..., kinds=(ErrorKind.TASK,)) """ sink_type: ClassVar[str] config_type: ClassVar[type] @classmethod def build_binding(cls, cfg: Any, ctx: ConfigContext) -> RuntimeSinkBinding: ...
[docs] class RuntimeSinkBinding(LoomFrozenStruct, frozen=True): """Resolved sink binding returned by a RegisteredSink. Args: purpose: Role of this sink within the runner. sink: Instantiated sink object ready for use. kinds: For ``purpose="errors"``, the ErrorKind values this sink handles. """ purpose: Literal["errors", "terminal", "audit"] sink: object kinds: tuple[ErrorKind, ...] = ()
class SinkRegistry: """Holds registered sink classes and resolves them against a ConfigContext. Each StreamingRunner owns one SinkRegistry. Not thread-safe; intended for startup-time registration only. """ def __init__(self) -> None: self._sinks: dict[str, Any] = {} def __contains__(self, item: object) -> bool: """Return True if the class is registered in this registry.""" return item in self._sinks.values() def register(self, cls: type) -> None: """Register a sink class. Args: cls: Class that satisfies the RegisteredSink protocol. Raises: TypeError: If cls does not implement RegisteredSink. """ if not isinstance(cls, RegisteredSink): raise TypeError( f"{cls!r} does not satisfy the RegisteredSink protocol. " "Required class attributes: sink_type (str), config_type (type), " "and classmethod build_binding(cfg, ctx)." ) self._sinks[cls.sink_type] = cls def resolve(self, ctx: ConfigContext) -> list[RuntimeSinkBinding]: """Resolve all registered sinks against the given config. Iterates streaming.sinks.* YAML entries, matches by type field, deserializes the section, calls build_binding, and validates for duplicate ErrorKind assignments. Args: ctx: Config context to read streaming.sinks from. Returns: List of RuntimeSinkBinding, one per matched YAML entry. Raises: DuplicateErrorSinkError: When two sinks claim the same ErrorKind. """ if not ctx.has("streaming.sinks"): return [] raw_sinks: dict[str, Any] = ctx.section_or_default("streaming.sinks", dict, {}) bindings: list[RuntimeSinkBinding] = [] for yaml_key, entry in raw_sinks.items(): sink_type = entry.get("type") if isinstance(entry, dict) else None if sink_type is None or sink_type not in self._sinks: continue cls = self._sinks[sink_type] cfg = ctx.section(f"streaming.sinks.{yaml_key}", cls.config_type) binding = cls.build_binding(cfg, ctx) bindings.append(binding) _validate_no_kind_duplicates(bindings) return bindings def _validate_no_kind_duplicates(bindings: list[RuntimeSinkBinding]) -> None: seen: dict[ErrorKind, str] = {} for binding in bindings: if binding.purpose != "errors": continue for kind in binding.kinds: if kind in seen: raise DuplicateErrorSinkError( f"ErrorKind.{kind} is claimed by more than one registered sink." ) seen[kind] = repr(binding.sink) __all__ = ["RegisteredSink", "RuntimeSinkBinding", "SinkRegistry"]