Source code for loom.streaming.kafka._key_resolver

"""Partition-key resolver contracts and built-ins."""

from __future__ import annotations

from typing import Generic, Protocol, TypeVar

from loom.core.model import LoomFrozenStruct
from loom.streaming.kafka._record import KafkaRecord

PayloadT = TypeVar("PayloadT")


[docs] class PartitionKeyResolver(Protocol[PayloadT]): """Compute the Kafka partition key for an outgoing record."""
[docs] def resolve(self, record: KafkaRecord[PayloadT]) -> bytes | None: """Return Kafka partition-key bytes. Args: record: Typed Kafka record to inspect. Returns: Partition-key bytes or ``None`` for broker-controlled routing. """
[docs] class PreserveKey(Generic[PayloadT]): """Preserve an existing record key when present."""
[docs] def resolve(self, record: KafkaRecord[PayloadT]) -> bytes | None: """Return the existing record key encoded as bytes. Args: record: Typed Kafka record. Returns: Existing key bytes or ``None``. """ key = record.key if key is None: return None if isinstance(key, bytes): return key return key.encode("utf-8")
[docs] class FixedKey(LoomFrozenStruct, frozen=True): """Always use one fixed partition key.""" value: bytes
[docs] def resolve(self, record: object) -> bytes: """Return the fixed key. Args: record: Kafka record or compatible object. Its payload is ignored. Returns: Fixed key bytes. """ del record return self.value