Source code for loom.streaming.kafka._message
"""Standard Kafka message envelope and helpers."""
from __future__ import annotations
import time
from typing import Generic, TypeVar
import msgspec
from loom.core.model import LoomFrozenStruct, LoomStruct
from loom.core.tracing import generate_trace_id, get_trace_id
PayloadT = TypeVar("PayloadT", bound=LoomStruct | LoomFrozenStruct)
HEADER_CORRELATION_ID = "x-correlation-id"
HEADER_CAUSATION_ID = "x-causation-id"
HEADER_TRACE_ID = "x-trace-id"
HEADER_PARENT_TRACE_ID = "x-parent-trace-id"
[docs]
class ContentType(LoomFrozenStruct, frozen=True):
"""Describe the logical wire content type of one Kafka payload.
Attributes:
media_type: Stable media type string for the payload bytes.
encoding: Optional encoding qualifier.
"""
media_type: str
encoding: str | None = None
[docs]
@classmethod
def msgpack(cls) -> ContentType:
"""Return the default Loom MessagePack content type.
Returns:
Standard MessagePack content-type descriptor for Loom-native
payloads.
"""
return cls(media_type="application/x-loom-msgpack")
[docs]
@classmethod
def avro(cls) -> ContentType:
"""Return the standard Avro content type.
Returns:
Standard Avro content-type descriptor.
"""
return cls(media_type="application/avro")
[docs]
class SchemaRef(LoomFrozenStruct, frozen=True):
"""Reference a payload schema contract without forcing one registry.
Attributes:
namespace: Schema authority or logical owner.
name: Stable schema or message name.
version: Schema version identifier.
format: Schema representation identifier.
"""
namespace: str
name: str
version: str
format: str
[docs]
class MessageDescriptor(LoomFrozenStruct, frozen=True):
"""Describe the stable identity of a Kafka message contract.
Attributes:
message_type: Stable logical message type name.
message_version: Logical message version.
content_type: Wire content descriptor.
schema_ref: Optional schema reference.
"""
message_type: str
message_version: int
content_type: ContentType = msgspec.field(default_factory=ContentType.msgpack)
schema_ref: SchemaRef | None = None
[docs]
class MessageEnvelope(LoomFrozenStruct, Generic[PayloadT], frozen=True):
"""Standard Kafka message envelope for Loom streaming.
Attributes:
meta: Envelope metadata.
payload: Typed payload body.
"""
meta: MessageMetadata
payload: PayloadT
[docs]
def build_message(
payload: PayloadT,
descriptor: MessageDescriptor,
*,
correlation_id: str | None = None,
parent_trace_id: str | None = None,
causation_id: str | None = None,
trace_id: str | None = None,
produced_at_ms: int | None = None,
) -> MessageEnvelope[PayloadT]:
"""Build the standard Kafka message envelope.
The caller usually provides only the payload and descriptor. Trace context
is taken from the active Loom tracing context when not supplied; when
none is active, a fresh trace identifier is generated so the envelope
still participates in lineage.
Args:
payload: Typed message payload.
descriptor: Stable message contract descriptor.
correlation_id: Optional correlation identifier.
parent_trace_id: Optional upstream trace identifier.
causation_id: Optional upstream message identifier.
trace_id: Optional explicit trace identifier.
produced_at_ms: Optional producer timestamp in epoch milliseconds.
Returns:
A typed message envelope ready for serialization.
"""
active_trace_id = trace_id if trace_id is not None else get_trace_id()
if active_trace_id is None:
active_trace_id = generate_trace_id()
timestamp_ms = produced_at_ms if produced_at_ms is not None else int(time.time() * 1000)
return MessageEnvelope(
meta=MessageMetadata(
descriptor=descriptor,
trace_id=active_trace_id,
parent_trace_id=parent_trace_id,
correlation_id=correlation_id,
causation_id=causation_id,
produced_at_ms=timestamp_ms,
),
payload=payload,
)