Source code for loom.streaming.nodes._protocols

"""Routing protocols for custom user logic."""

from __future__ import annotations

from typing import Protocol, TypeVar, runtime_checkable

from loom.core.model import LoomFrozenStruct, LoomStruct
from loom.streaming.core._message import Message

PayloadT = TypeVar("PayloadT", bound=LoomStruct | LoomFrozenStruct, contravariant=True)


[docs] @runtime_checkable class Selector(Protocol[PayloadT]): """Custom route-key selector."""
[docs] def select(self, message: Message[PayloadT]) -> object: """Return a route key for *message*.""" ...
[docs] @runtime_checkable class Predicate(Protocol[PayloadT]): """Custom boolean route predicate."""
[docs] def matches(self, message: Message[PayloadT]) -> bool: """Return whether *message* matches this predicate.""" ...
__all__ = ["Predicate", "Selector"]