Source code for loom.streaming.kafka.message._consumer
"""Message-level Kafka consumer with codec and envelope semantics."""
from __future__ import annotations
from time import perf_counter
from typing import Generic, Literal, TypeVar
from confluent_kafka import TopicPartition
from loom.core.model import LoomFrozenStruct, LoomStruct
from loom.core.observability.event import LifecycleEvent, Scope
from loom.core.observability.runtime import ObservabilityRuntime
from loom.streaming.kafka._codec import KafkaCodec
from loom.streaming.kafka._errors import KafkaDeserializationError
from loom.streaming.kafka._message import HEADER_CORRELATION_ID, HEADER_TRACE_ID, MessageEnvelope
from loom.streaming.kafka._record import KafkaRecord
from loom.streaming.kafka.client._protocol import KafkaConsumer
PayloadT = TypeVar("PayloadT", bound=LoomStruct | LoomFrozenStruct)
[docs]
class KafkaMessageConsumer(Generic[PayloadT]):
"""Polls raw bytes from a Kafka consumer and decodes them via codec.
All dependencies are injected via constructor — the message consumer
depends on the ``KafkaConsumer`` and ``KafkaCodec`` protocols, not on
concrete implementations.
Args:
raw: Raw Kafka consumer for byte-level transport.
codec: Codec for decoding bytes to message envelopes.
payload_type: Expected payload model type for decoding.
observer: Optional observability observer.
"""
def __init__(
self,
raw: KafkaConsumer,
codec: KafkaCodec[PayloadT],
payload_type: type[PayloadT],
obs: ObservabilityRuntime | None = None,
) -> None:
self._raw = raw
self._codec = codec
self._payload_type = payload_type
self._obs = obs
[docs]
def poll(self, timeout_ms: int) -> KafkaRecord[MessageEnvelope[PayloadT]] | None:
"""Read and decode one standard message envelope from Kafka.
Args:
timeout_ms: Maximum poll wait in milliseconds.
Returns:
One typed record carrying a decoded message envelope, or
``None`` when no record is available.
Raises:
KafkaPollError: If the backend poll fails.
KafkaDeserializationError: If the envelope cannot be decoded.
"""
record = self._raw.poll(timeout_ms)
if record is None:
return None
started = perf_counter()
try:
message = self._codec.decode(record.value, self._payload_type)
except Exception as exc:
if self._obs is not None:
self._obs.emit(
LifecycleEvent.exception(
scope=Scope.TRANSPORT,
name="kafka_decode",
trace_id=_header_trace_id(record.headers),
correlation_id=_header_correlation_id(record.headers),
error=str(exc),
meta={"topic": record.topic},
)
)
raise KafkaDeserializationError(str(exc)) from exc
if self._obs is not None:
self._obs.emit(
LifecycleEvent.end(
scope=Scope.TRANSPORT,
name="kafka_decode",
duration_ms=(perf_counter() - started) * 1000,
trace_id=message.meta.trace_id,
correlation_id=message.meta.correlation_id,
meta={"content_type": message.meta.descriptor.content_type.media_type},
)
)
return KafkaRecord(
topic=record.topic,
key=record.key,
value=message,
headers=record.headers,
partition=record.partition,
offset=record.offset,
timestamp_ms=record.timestamp_ms,
)
[docs]
def commit(self, *, asynchronous: bool = False) -> None:
"""Commit consumed offsets through the raw consumer.
Args:
asynchronous: Whether the backend may commit asynchronously.
"""
self._raw.commit(asynchronous=asynchronous)
[docs]
def commit_offset(self, partitions: list[TopicPartition]) -> None:
"""Commit explicit offsets through the raw consumer.
Args:
partitions: Kafka topic-partition offsets to commit.
"""
self._raw.commit_offset(partitions)
[docs]
def close(self) -> None:
"""Close the consumer and release resources."""
self._raw.close()
def __enter__(self) -> KafkaMessageConsumer[PayloadT]:
"""Return self for context-manager usage."""
return self
def __exit__(self, *exc: object) -> Literal[False]:
"""Close the consumer on context exit."""
try:
self.close()
except Exception:
if exc[0] is None:
raise
return False
def _header_trace_id(headers: dict[str, bytes]) -> str | None:
raw = headers.get(HEADER_TRACE_ID)
return raw.decode() if raw is not None else None
def _header_correlation_id(headers: dict[str, bytes]) -> str | None:
raw = headers.get(HEADER_CORRELATION_ID)
return raw.decode() if raw is not None else None