Source code for loom.streaming.core._errors
"""Streaming error model for explicit error branches."""
from __future__ import annotations
from enum import StrEnum
from typing import Generic, TypeVar
import msgspec
from loom.core.model import LoomFrozenStruct, LoomStruct
from loom.streaming.core._message import Message
PayloadT = TypeVar("PayloadT", bound=LoomStruct | LoomFrozenStruct, covariant=True)
[docs]
class ErrorKind(StrEnum):
"""Logical categories for streaming flow errors."""
WIRE = "wire"
ROUTING = "routing"
TASK = "task"
BUSINESS = "business"
[docs]
class ErrorMessage(LoomFrozenStruct, Generic[PayloadT], frozen=True, kw_only=True):
"""Wire-safe snapshot of the original message carried by an error.
Args:
payload: Original domain payload.
meta: Snapshot of the original message metadata.
"""
payload: PayloadT
meta: ErrorMessageMeta
[docs]
class ErrorEnvelope(LoomFrozenStruct, Generic[PayloadT], frozen=True):
"""Structured error payload routed through explicit error branches.
Args:
kind: Logical error category.
reason: Human-readable reason.
payload_type: The ``message_type`` of the original payload, mirrored
from ``original_message.meta.message_type`` for efficient
top-level dispatch without nested decode.
original_message: Wire-safe snapshot of the original message when
available.
"""
kind: ErrorKind
reason: str
payload_type: str | None = None
original_message: ErrorMessage[PayloadT] | None = None
def snapshot_message(message: Message[PayloadT]) -> ErrorMessage[PayloadT]:
"""Capture a wire-safe snapshot of a streaming message.
Args:
message: Original transport-neutral message.
Returns:
A msgspec-compatible snapshot preserving payload and transport metadata.
"""
return ErrorMessage(
payload=message.payload,
meta=ErrorMessageMeta(
message_id=message.meta.message_id,
correlation_id=message.meta.correlation_id,
trace_id=message.meta.trace_id,
parent_trace_id=message.meta.parent_trace_id,
causation_id=message.meta.causation_id,
produced_at_ms=message.meta.produced_at_ms,
message_type=message.meta.message_type,
message_version=message.meta.message_version,
topic=message.meta.topic,
partition=message.meta.partition,
offset=message.meta.offset,
key=_normalize_key(message.meta.key),
headers=message.meta.headers,
),
)
def _normalize_key(key: bytes | str | None) -> bytes | None:
if key is None:
return None
if isinstance(key, bytes):
return key
return key.encode("utf-8")
__all__ = [
"ErrorEnvelope",
"ErrorKind",
"ErrorMessage",
"ErrorMessageMeta",
"snapshot_message",
]