Source code for loom.streaming.nodes._sink
"""Storage sink protocols for the streaming DSL.
Defines the two-level contract between the streaming graph and any storage backend:
- SinkPartition: what runs per Bytewax worker (write_batch / close).
- IntoSink: what any Into* terminal node exposes to the compiler and adapter.
The compiler detects IntoSink nodes by structural protocol check — it never
inspects type names. Any frozen dataclass that satisfies IntoSink is a
first-class terminal node without registration or inheritance.
"""
from __future__ import annotations
from collections.abc import Sequence
from typing import Protocol, TypeVar, runtime_checkable
from loom.core.async_bridge import AsyncBridge
from loom.streaming.core._message import StreamPayload
EventT_contra = TypeVar("EventT_contra", bound=StreamPayload, contravariant=True)
EventT = TypeVar("EventT", bound=StreamPayload) # invariant — used where type[T] appears
[docs]
class SinkPartition(Protocol[EventT_contra]):
"""Per-worker write interface for a storage sink.
The adapter calls write_batch() once per Bytewax epoch and close() on
shutdown. Implementations must be sync-safe — use AsyncBridge for storage
targets that require async I/O (e.g. SQLAlchemy async sessions).
Args:
EventT_contra: Contravariant event type this partition accepts.
"""
[docs]
def write_batch(self, items: Sequence[EventT_contra]) -> None:
"""Write a batch of events for this epoch.
Args:
items: Events delivered by Bytewax for the current epoch.
"""
...
[docs]
def close(self) -> None:
"""Release resources and flush any pending data.
Called once per worker on shutdown or after a failed run. Must be
idempotent — the adapter may call close() even when write_batch()
was never invoked.
"""
...
[docs]
@runtime_checkable
class IntoSink(Protocol[EventT]):
"""Structural protocol satisfied by all Into* terminal storage sink nodes.
The compiler detects nodes implementing this protocol through a structural
isinstance check and resolves their config from streaming.sinks.<name>.
The adapter calls build_partition() once per Bytewax worker to obtain the
SinkPartition that handles epoch writes and shutdown.
Implementing a custom Into* node requires no framework imports, no
registration, and no base class — any frozen dataclass with the four
declared members satisfies this protocol and is treated identically to
built-in nodes such as IntoTable.
Example::
@dataclass(frozen=True)
class IntoMongo(Generic[EventT]):
payload: type[EventT]
collection: str
name: str = ""
router_branch_safe: ClassVar[bool] = True
def build_partition(
self,
config,
worker_index,
worker_count,
bridge=None,
session_manager=None,
):
return MongoPartition(url=config["url"], collection=self.collection)
Args:
EventT_contra: Contravariant event type this sink accepts.
"""
payload: type[EventT]
name: str
router_branch_safe: bool # ClassVar[bool] = True on all implementations
[docs]
def build_partition(
self,
config: object,
worker_index: int,
worker_count: int,
bridge: AsyncBridge | None = None,
session_manager: object | None = None,
) -> SinkPartition[EventT]:
"""Build the per-worker partition for this sink.
Called once per Bytewax worker at startup. The returned SinkPartition
is owned by that worker for the lifetime of the run.
Args:
config: Resolved streaming.sinks.<name> config section.
worker_index: Zero-based index of the worker calling this method.
worker_count: Total number of workers in this run.
bridge: Optional async bridge provided by the Bytewax runtime.
session_manager: Optional adapter-owned backend resource for async SQLAlchemy sinks.
Returns:
A SinkPartition that will handle write_batch() and close() calls.
"""
...
__all__ = ["IntoSink", "SinkPartition"]