Source code for loom.streaming.nodes._boundary
"""Topic-oriented streaming boundaries and partitioning contracts."""
from __future__ import annotations
from enum import StrEnum
from typing import ClassVar, Generic, Protocol, TypeVar
from loom.core.model import LoomFrozenStruct, LoomStruct
from loom.core.routing import LogicalRef, as_logical_ref
from loom.streaming.core._message import Message
from loom.streaming.nodes._shape import StreamShape
PayloadT = TypeVar("PayloadT", bound=LoomStruct | LoomFrozenStruct, contravariant=True)
MultiPayloadT = TypeVar("MultiPayloadT", bound=LoomStruct | LoomFrozenStruct, covariant=True)
[docs]
class FromTopic(LoomFrozenStruct, Generic[PayloadT], frozen=True):
"""Declare a topic-based input boundary.
Pattern:
Boundary node.
Args:
name: Logical input reference. By default this is also used as the
Kafka config reference.
payload: Logical payload type descriptor.
shape: Declared source shape.
"""
name: str
payload: type[PayloadT]
shape: StreamShape = StreamShape.RECORD
@property
def logical_ref(self) -> LogicalRef:
"""Logical input reference."""
return as_logical_ref(self.name)
[docs]
class FromMultiTypeTopic(LoomFrozenStruct, Generic[MultiPayloadT], frozen=True):
"""Declare a heterogeneous topic-based input boundary.
Reads from a single Kafka topic that carries multiple payload types.
The runtime dispatches each record to the correct decoder using
``MessageEnvelope.meta.descriptor.message_type`` for plain payloads,
``ErrorEnvelope.payload_type`` for routed error envelopes, and the
dedicated wire error message type for ``DecodeError`` payloads.
Each type in ``payloads`` uses ``__loom_message_type__`` when present and
otherwise falls back to the fully qualified Python name
``f"{type.__module__}.{type.__qualname__}"`` so the compiler can build the
dispatch table at compile time without requiring extra boilerplate.
``DecodeError`` is a special-case wire payload with its own explicit
message type.
Args:
name: Logical input reference.
payloads: Tuple of two or more expected payload types.
shape: Declared source shape.
Raises:
ValueError: When fewer than two payload types are declared.
Example:
.. code-block:: python
source = FromMultiTypeTopic(
"events.all",
payloads=(OrderCreated, OrderCancelled),
)
"""
name: str
payloads: tuple[type[LoomStruct | LoomFrozenStruct], ...]
shape: StreamShape = StreamShape.RECORD
def __post_init__(self) -> None:
if len(self.payloads) < 2:
raise ValueError(
"FromMultiTypeTopic requires at least two payload types. "
"Use FromTopic for a single-type source."
)
@property
def logical_ref(self) -> LogicalRef:
"""Logical input reference."""
return as_logical_ref(self.name)
[docs]
class IntoTopic(LoomFrozenStruct, Generic[PayloadT], frozen=True):
"""Declare a topic-based output boundary.
Pattern:
Boundary node.
Args:
name: Logical output reference. By default this is also used as the
Kafka config reference.
payload: Optional logical payload type descriptor.
shape: Declared output shape.
partitioning: Optional partitioning policy.
dlq: Optional dead-letter topic name. When set, messages that fail
delivery are routed to this topic instead of crashing the worker.
The DLQ producer reuses the same broker settings as the main
output and adds an ``x-dlq-error`` header with the failure reason.
"""
name: str
payload: type[PayloadT] | None = None
shape: StreamShape = StreamShape.RECORD
partitioning: PartitionPolicy[PayloadT] | None = None
dlq: str | None = None
router_branch_safe: ClassVar[bool] = True
@property
def logical_ref(self) -> LogicalRef:
"""Logical output reference."""
return as_logical_ref(self.name)
[docs]
class PartitionStrategy(Protocol[PayloadT]):
"""Compute the outgoing transport partition key for a message."""
[docs]
def partition_key(self, message: Message[PayloadT]) -> bytes | str | None:
"""Return the outgoing transport partition key."""
[docs]
class PartitionGuarantee(StrEnum):
"""Declared affinity guarantee of a partitioning strategy."""
NONE = "none"
BEST_EFFORT = "best_effort"
ENTITY_STABLE = "entity_stable"
[docs]
class PartitionPolicy(LoomFrozenStruct, Generic[PayloadT], frozen=True):
"""Declarative partitioning policy for topic output.
Pattern:
Boundary helper.
Args:
strategy: Partition-key strategy used for output records.
guarantee: Declared affinity guarantee offered by the strategy.
allow_repartition: Whether the flow may override the incoming key.
"""
strategy: PartitionStrategy[PayloadT]
guarantee: PartitionGuarantee
allow_repartition: bool = False
__all__ = [
"FromMultiTypeTopic",
"FromTopic",
"IntoTopic",
"PartitionGuarantee",
"PartitionPolicy",
"PartitionStrategy",
]