Source code for loom.streaming.nodes._broadcast
"""Broadcast node — inclusive fan-out to multiple independent branches."""
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Generic, TypeVar
from loom.core.model import LoomFrozenStruct
from loom.streaming.core._message import StreamPayload
from loom.streaming.nodes._boundary import IntoTopic
if TYPE_CHECKING:
from loom.streaming.graph._flow import Process
else:
class Process:
@classmethod
def __class_getitem__(cls, item: object) -> type[Process]:
del item
return cls
InT = TypeVar("InT", bound=StreamPayload)
OutT = TypeVar("OutT", bound=StreamPayload)
[docs]
class BroadcastRoute(LoomFrozenStruct, Generic[InT, OutT], frozen=True):
"""One branch of a :class:`Broadcast` node.
Pattern:
Fan-out branch.
Args:
process: Transformation nodes applied to every incoming message on
this branch.
output: Optional terminal Kafka topic that receives the transformed
messages. When omitted, the branch acts as a discard branch after
its inner process completes.
Example::
BroadcastRoute(
process=Process(RecordStep(to_analytics_event)),
output=IntoTopic("events.analytics", payload=AnalyticsEvent),
)
"""
process: Process[InT, OutT]
output: IntoTopic[OutT] | None = None
[docs]
class Broadcast(Generic[InT]):
"""Terminal fan-out node that delivers every message to all branches simultaneously.
Pattern:
Inclusive fan-out.
Unlike :class:`~loom.streaming.Fork`, which routes each message to exactly
one branch, ``Broadcast`` copies the message to every declared branch.
All branches are independent: they may apply different transformations and
write to different output topics.
``Broadcast`` is terminal — no process nodes may follow it.
Args:
*routes: One or more :class:`BroadcastRoute` declarations.
Raises:
ValueError: If no routes are provided.
Example::
Broadcast(
BroadcastRoute(
process=Process(RecordStep(to_analytics_event)),
output=IntoTopic("events.analytics", payload=AnalyticsEvent),
),
BroadcastRoute(
process=Process(RecordStep(to_fulfillment_order)),
output=IntoTopic("orders.fulfillment", payload=FulfillmentOrder),
),
)
"""
__slots__ = ("_routes",)
def __init__(self, *routes: BroadcastRoute[InT, Any]) -> None:
if not routes:
raise ValueError("Broadcast requires at least one route.")
self._routes = routes
@property
def routes(self) -> tuple[BroadcastRoute[InT, Any], ...]:
"""Ordered fan-out branches."""
return self._routes
__all__ = ["Broadcast", "BroadcastRoute"]