Source code for loom.streaming.nodes._expand_routes
"""Fan-out node that expands one event into typed sub-collections and routes each to its Process."""
from __future__ import annotations
from collections.abc import Mapping
from types import MappingProxyType
from typing import TYPE_CHECKING, Any, Generic, TypeVar
from loom.streaming.core._message import StreamPayload
from loom.streaming.nodes._decompose import PayloadExpander
if TYPE_CHECKING:
from loom.streaming.graph._flow import Process
else:
class Process:
@classmethod
def __class_getitem__(cls, item: object) -> type[Process]:
del item
return cls
InT = TypeVar("InT", bound=StreamPayload)
[docs]
class ExpandRoutes(Generic[InT]):
"""Decompose one event into typed sub-collections and route each to its own Process.
Unlike Broadcast (which copies the same message to every route), ExpandRoutes
calls expander.expand() exactly once per event and routes each output type's
rows to the matching Process. Zero redundant reprocessing.
ExpandRoutes is terminal — no process nodes may follow it in the parent Process.
Args:
expander: PayloadExpander subclass that produces typed sub-collections.
routes: Mapping from output type to the Process that handles those rows.
default: Optional fallback Process for types not declared in routes.
Raises:
ValueError: If routes is empty and default is None.
"""
__slots__ = ("_default", "_expander", "_routes")
def __init__(
self,
*,
expander: type[PayloadExpander[InT]],
routes: dict[type, Process[Any, Any]],
default: Process[Any, Any] | None = None,
) -> None:
if not routes and default is None:
raise ValueError("ExpandRoutes requires at least one route or a default Process.")
self._expander = expander
self._routes: Mapping[type, Process[Any, Any]] = MappingProxyType(dict(routes))
self._default = default
@property
def expander(self) -> type[PayloadExpander[InT]]:
"""PayloadExpander subclass used to decompose each event."""
return self._expander
@property
def routes(self) -> Mapping[type, Process[Any, Any]]:
"""Mapping from output type to its handling Process."""
return self._routes
@property
def default(self) -> Process[Any, Any] | None:
"""Fallback Process for types not declared in routes."""
return self._default
__all__ = ["ExpandRoutes"]