Source code for loom.streaming.kafka.client._protocol
"""Raw Kafka transport protocols."""
from __future__ import annotations
from collections.abc import Callable
from typing import Protocol
from confluent_kafka import TopicPartition
from loom.streaming.kafka._errors import KafkaDeliveryError
from loom.streaming.kafka._record import KafkaRecord
DeliveryCallback = Callable[[KafkaRecord[bytes], KafkaDeliveryError | None], None]
[docs]
class KafkaProducer(Protocol):
"""Raw Kafka producer contract.
Sends ``KafkaRecord[bytes]`` to Kafka. No codec, no envelope,
no serialization — only raw byte transport.
"""
[docs]
def send(self, record: KafkaRecord[bytes]) -> None:
"""Produce one raw byte record.
Args:
record: Kafka record with a ``bytes`` value.
"""
[docs]
def flush(self, timeout_ms: int | None = None) -> None:
"""Flush pending records.
Args:
timeout_ms: Optional maximum flush wait in milliseconds.
"""
[docs]
def close(self) -> None:
"""Flush and close the producer."""
[docs]
class KafkaConsumer(Protocol):
"""Raw Kafka consumer contract.
Returns ``KafkaRecord[bytes]`` from Kafka. No codec, no envelope,
no deserialization — only raw byte transport.
"""
[docs]
def poll(self, timeout_ms: int) -> KafkaRecord[bytes] | None:
"""Read one raw byte record.
Args:
timeout_ms: Maximum poll wait in milliseconds.
Returns:
One raw Kafka record or ``None`` when no record is available.
"""
[docs]
def commit(self, *, asynchronous: bool = False) -> None:
"""Commit consumed offsets.
Args:
asynchronous: Whether the backend may commit asynchronously.
"""
[docs]
def commit_offset(self, partitions: list[TopicPartition]) -> None:
"""Commit explicit offsets.
Args:
partitions: Kafka topic-partition offsets to commit.
"""
[docs]
def close(self) -> None:
"""Close the consumer and release resources."""