Source code for loom.streaming.nodes._protocols
"""Routing protocols for custom user logic."""
from __future__ import annotations
from typing import Protocol, TypeVar, runtime_checkable
from loom.core.model import LoomFrozenStruct, LoomStruct
from loom.streaming.core._message import Message
PayloadT = TypeVar("PayloadT", bound=LoomStruct | LoomFrozenStruct, contravariant=True)
[docs]
@runtime_checkable
class Selector(Protocol[PayloadT]):
"""Custom route-key selector."""
[docs]
def select(self, message: Message[PayloadT]) -> object:
"""Return a route key for *message*."""
...
[docs]
@runtime_checkable
class Predicate(Protocol[PayloadT]):
"""Custom boolean route predicate."""
[docs]
def matches(self, message: Message[PayloadT]) -> bool:
"""Return whether *message* matches this predicate."""
...
__all__ = ["Predicate", "Selector"]