Source code for loom.streaming.kafka.message._protocol

"""Message-level Kafka transport protocols."""

from __future__ import annotations

from typing import Protocol, TypeVar

from confluent_kafka import TopicPartition

from loom.core.model import LoomFrozenStruct, LoomStruct
from loom.streaming.kafka._message import MessageDescriptor, MessageEnvelope
from loom.streaming.kafka._record import KafkaRecord

PayloadContraT = TypeVar(
    "PayloadContraT",
    bound=LoomStruct | LoomFrozenStruct,
    contravariant=True,
)
PayloadT = TypeVar("PayloadT", bound=LoomStruct | LoomFrozenStruct)


[docs] class MessageProducer(Protocol[PayloadContraT]): """Typed message producer contract. Builds a standard envelope, encodes via codec, and delegates to a raw Kafka producer. """
[docs] def send( self, *, topic: str, payload: PayloadContraT, descriptor: MessageDescriptor, key: bytes | str | None = None, headers: dict[str, bytes] | None = None, correlation_id: str | None = None, parent_trace_id: str | None = None, causation_id: str | None = None, trace_id: str | None = None, produced_at_ms: int | None = None, ) -> None: """Build, encode, and produce one standard message envelope. Args: topic: Target Kafka topic. payload: Typed payload body. descriptor: Stable message descriptor. key: Optional Kafka partition key. headers: Optional Kafka headers. correlation_id: Optional correlation identifier. parent_trace_id: Optional upstream trace identifier. causation_id: Optional upstream message identifier. trace_id: Optional explicit trace identifier. produced_at_ms: Optional producer timestamp in epoch milliseconds. """
[docs] def flush(self, timeout_ms: int | None = None) -> None: """Flush pending records. Args: timeout_ms: Optional maximum flush wait in milliseconds. """
[docs] def close(self) -> None: """Flush and close the producer."""
[docs] class MessageConsumer(Protocol[PayloadT]): """Typed message consumer contract. Polls raw bytes from a Kafka consumer and decodes them into typed message envelopes via codec. """
[docs] def poll(self, timeout_ms: int) -> KafkaRecord[MessageEnvelope[PayloadT]] | None: """Read and decode one standard message envelope. Args: timeout_ms: Maximum poll wait in milliseconds. Returns: One typed record carrying a decoded message envelope, or ``None`` when no record is available. """
[docs] def commit(self, *, asynchronous: bool = False) -> None: """Commit consumed offsets. Args: asynchronous: Whether the backend may commit asynchronously. """
[docs] def commit_offset(self, partitions: list[TopicPartition]) -> None: """Commit explicit offsets. Args: partitions: Kafka topic-partition offsets to commit. """
[docs] def close(self) -> None: """Close the consumer and release resources."""