Source code for loom.streaming.core._message

"""Transport-neutral streaming message model."""

from __future__ import annotations

from typing import Generic, TypeVar

import msgspec
from typing_extensions import TypeAliasType

from loom.core.model import LoomFrozenStruct, LoomStruct

StreamPayload = TypeAliasType("StreamPayload", LoomStruct | LoomFrozenStruct)
"""Union of all valid streaming payload types."""

PayloadT = TypeVar("PayloadT", bound=StreamPayload, covariant=True)


[docs] class MessageMeta(LoomFrozenStruct, frozen=True, kw_only=True): """Transport-neutral metadata for one logical event. Args: message_id: Stable event identifier. correlation_id: Optional correlation identifier. trace_id: Optional trace identifier. parent_trace_id: Optional upstream trace identifier. causation_id: Optional upstream event identifier. produced_at_ms: Optional original producer timestamp in epoch milliseconds. message_type: Optional logical message contract name. message_version: Optional logical message contract version. topic: Source topic name when available. partition: Source partition when available. offset: Source offset when available. key: Optional transport key when available. headers: Opaque transport headers. """ message_id: str correlation_id: str | None = None trace_id: str | None = None parent_trace_id: str | None = None causation_id: str | None = None produced_at_ms: int | None = None message_type: str | None = None message_version: int | None = None topic: str | None = None partition: int | None = None offset: int | None = None key: bytes | str | None = None headers: dict[str, bytes] = msgspec.field(default_factory=dict)
[docs] class Message(LoomFrozenStruct, Generic[PayloadT], frozen=True, kw_only=True): """Logical typed streaming event. Args: payload: Typed event payload. meta: Transport-neutral metadata. """ payload: PayloadT meta: MessageMeta
__all__ = ["Message", "MessageMeta", "StreamPayload"]