Source code for loom.streaming.kafka.message._protocol
"""Message-level Kafka transport protocols."""
from __future__ import annotations
from typing import Protocol, TypeVar
from confluent_kafka import TopicPartition
from loom.core.model import LoomFrozenStruct, LoomStruct
from loom.streaming.kafka._message import MessageDescriptor, MessageEnvelope
from loom.streaming.kafka._record import KafkaRecord
PayloadContraT = TypeVar(
"PayloadContraT",
bound=LoomStruct | LoomFrozenStruct,
contravariant=True,
)
PayloadT = TypeVar("PayloadT", bound=LoomStruct | LoomFrozenStruct)
[docs]
class MessageProducer(Protocol[PayloadContraT]):
"""Typed message producer contract.
Builds a standard envelope, encodes via codec, and delegates
to a raw Kafka producer.
"""
[docs]
def send(
self,
*,
topic: str,
payload: PayloadContraT,
descriptor: MessageDescriptor,
key: bytes | str | None = None,
headers: dict[str, bytes] | None = None,
correlation_id: str | None = None,
parent_trace_id: str | None = None,
causation_id: str | None = None,
trace_id: str | None = None,
produced_at_ms: int | None = None,
) -> None:
"""Build, encode, and produce one standard message envelope.
Args:
topic: Target Kafka topic.
payload: Typed payload body.
descriptor: Stable message descriptor.
key: Optional Kafka partition key.
headers: Optional Kafka headers.
correlation_id: Optional correlation identifier.
parent_trace_id: Optional upstream trace identifier.
causation_id: Optional upstream message identifier.
trace_id: Optional explicit trace identifier.
produced_at_ms: Optional producer timestamp in epoch milliseconds.
"""
[docs]
def flush(self, timeout_ms: int | None = None) -> None:
"""Flush pending records.
Args:
timeout_ms: Optional maximum flush wait in milliseconds.
"""
[docs]
def close(self) -> None:
"""Flush and close the producer."""
[docs]
class MessageConsumer(Protocol[PayloadT]):
"""Typed message consumer contract.
Polls raw bytes from a Kafka consumer and decodes them into
typed message envelopes via codec.
"""
[docs]
def poll(self, timeout_ms: int) -> KafkaRecord[MessageEnvelope[PayloadT]] | None:
"""Read and decode one standard message envelope.
Args:
timeout_ms: Maximum poll wait in milliseconds.
Returns:
One typed record carrying a decoded message envelope, or
``None`` when no record is available.
"""
[docs]
def commit(self, *, asynchronous: bool = False) -> None:
"""Commit consumed offsets.
Args:
asynchronous: Whether the backend may commit asynchronously.
"""
[docs]
def commit_offset(self, partitions: list[TopicPartition]) -> None:
"""Commit explicit offsets.
Args:
partitions: Kafka topic-partition offsets to commit.
"""
[docs]
def close(self) -> None:
"""Close the consumer and release resources."""