Source code for loom.streaming.core._message
"""Transport-neutral streaming message model."""
from __future__ import annotations
from typing import Generic, TypeVar
import msgspec
from typing_extensions import TypeAliasType
from loom.core.model import LoomFrozenStruct, LoomStruct
StreamPayload = TypeAliasType("StreamPayload", LoomStruct | LoomFrozenStruct)
"""Union of all valid streaming payload types."""
PayloadT = TypeVar("PayloadT", bound=StreamPayload, covariant=True)
[docs]
class MessageMeta(LoomFrozenStruct, frozen=True, kw_only=True):
"""Transport-neutral metadata for one logical event.
Args:
message_id: Stable event identifier.
correlation_id: Optional correlation identifier.
trace_id: Optional trace identifier.
parent_trace_id: Optional upstream trace identifier.
causation_id: Optional upstream event identifier.
produced_at_ms: Optional original producer timestamp in epoch
milliseconds.
message_type: Optional logical message contract name.
message_version: Optional logical message contract version.
topic: Source topic name when available.
partition: Source partition when available.
offset: Source offset when available.
key: Optional transport key when available.
headers: Opaque transport headers.
"""
message_id: str
correlation_id: str | None = None
trace_id: str | None = None
parent_trace_id: str | None = None
causation_id: str | None = None
produced_at_ms: int | None = None
message_type: str | None = None
message_version: int | None = None
topic: str | None = None
partition: int | None = None
offset: int | None = None
key: bytes | str | None = None
headers: dict[str, bytes] = msgspec.field(default_factory=dict)
[docs]
class Message(LoomFrozenStruct, Generic[PayloadT], frozen=True, kw_only=True):
"""Logical typed streaming event.
Args:
payload: Typed event payload.
meta: Transport-neutral metadata.
"""
payload: PayloadT
meta: MessageMeta
__all__ = ["Message", "MessageMeta", "StreamPayload"]