Source code for loom.streaming.compiler._plan

"""Compiled plan structures and compiler errors - output of the compiler, input to the adapter."""

from __future__ import annotations

from collections.abc import Mapping
from dataclasses import dataclass, field
from typing import Any, ClassVar, Literal

from loom.core.model import LoomFrozenStruct, LoomStruct
from loom.streaming.core._errors import ErrorKind
from loom.streaming.kafka._config import ConsumerSettings, ProducerSettings
from loom.streaming.kafka._wire import DispatchTable
from loom.streaming.mongo import MongoSourceConfig
from loom.streaming.nodes._boundary import PartitionPolicy
from loom.streaming.nodes._shape import StreamShape
from loom.streaming.nodes._table.common import (
    ClickHouseSinkConfig,
    DeltaSinkConfig,
    SqlAlchemyDatabaseConfig,
    SqlAlchemySinkConfig,
)


[docs] @dataclass(frozen=True) class CompiledSingleSource: """Resolved Kafka source with a single payload type and decode strategy.""" needs_decode: ClassVar[bool] = True settings: ConsumerSettings topics: tuple[str, ...] payload_type: type[LoomStruct | LoomFrozenStruct] shape: StreamShape decode_strategy: Literal["record", "batch"]
[docs] @dataclass(frozen=True) class CompiledMultiSource: """Resolved heterogeneous Kafka source with a pre-built dispatch table. Args: settings: Resolved Kafka consumer settings. topics: Kafka topic names to subscribe to. dispatch: Pre-built table mapping ``message_type`` and error ``payload_type`` strings to their concrete Python types, plus explicit wire-error payload types. shape: Declared source shape. decode_strategy: Whether to decode records individually or in batches. """ needs_decode: ClassVar[bool] = True settings: ConsumerSettings topics: tuple[str, ...] dispatch: DispatchTable shape: StreamShape decode_strategy: Literal["record", "batch"]
@dataclass(frozen=True) class CompiledMongoCDCSource: """Resolved MongoDB CDC source configuration.""" needs_decode: ClassVar[bool] = False settings: MongoSourceConfig collections: tuple[str, ...] watch_options: Mapping[str, object] shape: StreamShape
[docs] @dataclass(frozen=True) class CompiledSink: """Resolved Kafka sink with partitioning.""" settings: ProducerSettings topic: str partition_policy: PartitionPolicy[LoomStruct | LoomFrozenStruct] | None dlq_topic: str | None = None
[docs] @dataclass(frozen=True) class CompiledStorageSink: """Resolved storage sink with its DSL node and pre-fetched config section. The adapter calls ``node.build_partition(config, worker_index, worker_count, bridge)`` at startup once per Bytewax worker to obtain the :class:`SinkPartition` that handles epoch writes and shutdown. Storage backends that require async execution receive the adapter-managed :class:`~loom.core.async_bridge.AsyncBridge`. Args: node: The :class:`~loom.streaming.nodes.IntoSink` node as declared in the DSL. config: Resolved storage sink config for the selected backend. database_config: Resolved shared SQLAlchemy database config, if needed. """ node: Any config: SqlAlchemySinkConfig | DeltaSinkConfig | ClickHouseSinkConfig database_config: SqlAlchemyDatabaseConfig | None = None
[docs] @dataclass(frozen=True) class CompiledNode: """DSL node annotated with input/output shapes.""" node: object input_shape: StreamShape output_shape: StreamShape path: tuple[int, ...] = ()
[docs] class CompilationError(Exception): """Raised when a StreamFlow fails validation or cannot be compiled. Args: errors: List of human-readable error messages. """ def __init__(self, errors: list[str]) -> None: self.errors = errors super().__init__(f"Compilation failed with {len(errors)} error(s): {'; '.join(errors)}")
CompiledSource = CompiledSingleSource | CompiledMultiSource | CompiledMongoCDCSource """Union of all compiled source variants accepted by a :class:`CompiledPlan`."""
[docs] @dataclass(frozen=True) class CompiledPlan: """Immutable result of compiling a StreamFlow.""" name: str source: CompiledSource nodes: tuple[CompiledNode, ...] output: CompiledSink | None error_routes: dict[ErrorKind, CompiledSink] needs_async_bridge: bool terminal_sinks: dict[tuple[int, ...], CompiledSink] = field(default_factory=dict) terminal_storage_sinks: dict[tuple[int, ...], CompiledStorageSink] = field(default_factory=dict)