Source code for loom.streaming.kafka._codec

"""Codec protocol and MessagePack implementation for Kafka messages."""

from __future__ import annotations

from functools import lru_cache
from typing import Generic, Protocol, TypeVar

import msgspec

from loom.core.model import LoomFrozenStruct, LoomStruct
from loom.streaming.kafka._message import MessageEnvelope

PayloadT = TypeVar("PayloadT", bound=LoomStruct | LoomFrozenStruct)


[docs] class KafkaCodec(Protocol[PayloadT]): """Encode and decode Kafka message envelopes."""
[docs] def encode(self, message: MessageEnvelope[PayloadT]) -> bytes: """Serialize one message envelope to bytes. Args: message: Typed message envelope. Returns: Encoded message bytes. """ ...
[docs] def decode(self, raw: bytes, payload_type: type[PayloadT]) -> MessageEnvelope[PayloadT]: """Deserialize bytes to one message envelope. Args: raw: Raw Kafka payload bytes. payload_type: Expected payload model type. Returns: Decoded typed message envelope. """ ...
[docs] class MsgspecCodec(Generic[PayloadT]): """Direct MessagePack codec for Kafka message envelopes."""
[docs] def encode(self, message: MessageEnvelope[PayloadT]) -> bytes: """Serialize one typed message envelope. Args: message: Typed message envelope. Returns: MessagePack bytes. """ return msgspec.msgpack.encode(message)
[docs] def decode(self, raw: bytes, payload_type: type[PayloadT]) -> MessageEnvelope[PayloadT]: """Deserialize one typed message envelope. Args: raw: Raw MessagePack bytes. payload_type: Expected payload model type. Returns: Decoded typed message envelope. """ envelope_type = _envelope_type(payload_type) # type: ignore[arg-type] decoded: MessageEnvelope[PayloadT] = msgspec.msgpack.decode(raw, type=envelope_type) return decoded
@lru_cache(maxsize=128) def _envelope_type(payload_type: type[PayloadT]) -> object: """Return the parametrised msgspec envelope type, cached per payload class.""" return MessageEnvelope[payload_type] # type: ignore[valid-type]