Source code for loom.streaming.nodes._router

"""Streaming router declarations."""

from __future__ import annotations

from collections.abc import Mapping, Sequence
from typing import TYPE_CHECKING, ClassVar, Generic, TypeVar

from loom.core.model import LoomFrozenStruct
from loom.streaming.core._message import StreamPayload
from loom.streaming.nodes._expr_eval import (
    PredicateSpec,
    SelectorSpec,
    evaluate_predicate,
    select_value,
)

if TYPE_CHECKING:
    from loom.streaming.graph._flow import Process
else:

    class Process:
        @classmethod
        def __class_getitem__(cls, item: object) -> type[Process]:
            del item
            return cls


InT = TypeVar("InT", bound=StreamPayload)
OutT = TypeVar("OutT", bound=StreamPayload)


[docs] class Route(LoomFrozenStruct, Generic[InT, OutT], frozen=True): """One predicate route branch. Pattern: Route declaration. Args: when: Predicate expression or custom predicate. process: Process executed when the predicate matches. """ when: PredicateSpec[InT] process: Process[InT, OutT]
[docs] class Router(Generic[InT, OutT]): """Declarative streaming router. Pattern: In-place routing. Use :meth:`by` for key-based dispatch and :meth:`when` for ordered predicate dispatch. The router only declares graph shape; runtime execution belongs to backend adapters. """ __slots__ = ("_selector", "_routes", "_predicate_routes", "_default") router_branch_safe: ClassVar[bool] = True def __init__( self, *, selector: SelectorSpec[InT] | None = None, routes: Mapping[object, Process[InT, OutT]] | None = None, predicate_routes: Sequence[Route[InT, OutT]] = (), default: Process[InT, OutT] | None = None, ) -> None: if selector is None and not predicate_routes: raise ValueError("Router requires a selector or at least one predicate route.") if selector is not None and not routes: raise ValueError("Router.by requires at least one keyed route.") self._selector = selector self._routes = dict(routes or {}) self._predicate_routes = tuple(predicate_routes) self._default = default
[docs] @classmethod def by( cls, selector: SelectorSpec[InT], routes: Mapping[object, Process[InT, OutT]], *, default: Process[InT, OutT] | None = None, ) -> Router[InT, OutT]: """Declare key-based routing. Args: selector: Path expression or custom selector. routes: Processes keyed by selector result. default: Optional fallback process. Returns: Router declaration. """ return cls(selector=selector, routes=routes, default=default)
[docs] @classmethod def when( cls, routes: Sequence[Route[InT, OutT]], *, default: Process[InT, OutT] | None = None, ) -> Router[InT, OutT]: """Declare ordered first-match predicate routing. Args: routes: Ordered predicate routes. default: Optional fallback process. Returns: Router declaration. """ return cls(predicate_routes=routes, default=default)
@property def selector(self) -> SelectorSpec[InT] | None: """Key selector for ``Router.by`` routes.""" return self._selector @property def routes(self) -> Mapping[object, Process[InT, OutT]]: """Keyed routes for ``Router.by``.""" return dict(self._routes) @property def predicate_routes(self) -> tuple[Route[InT, OutT], ...]: """Ordered predicate routes for ``Router.when``.""" return self._predicate_routes @property def default(self) -> Process[InT, OutT] | None: """Fallback process.""" return self._default
__all__ = ["Route", "Router", "evaluate_predicate", "select_value"]