loom.streaming.kafka

Kafka transport contracts for Loom streaming.

class loom.streaming.kafka.ContentType(media_type, encoding=None)[source]

Bases: LoomFrozenStruct

Describe the logical wire content type of one Kafka payload.

Parameters:
  • media_type (str)

  • encoding (str | None)

media_type

Stable media type string for the payload bytes.

Type:

str

encoding

Optional encoding qualifier.

Type:

str | None

classmethod msgpack()[source]

Return the default Loom MessagePack content type.

Returns:

Standard MessagePack content-type descriptor for Loom-native payloads.

Return type:

ContentType

classmethod avro()[source]

Return the standard Avro content type.

Returns:

Standard Avro content-type descriptor.

Return type:

ContentType

class loom.streaming.kafka.ConsumerSettings(*, brokers, group_id, topics, auto_offset_reset='earliest', poll_timeout_ms=100, enable_auto_commit=True, security=None, extra=<factory>)[source]

Bases: LoomFrozenStruct

Typed Kafka consumer settings.

Parameters:
brokers

Kafka broker addresses.

Type:

tuple[str, …]

group_id

Consumer group identifier.

Type:

str

topics

Topics to subscribe to.

Type:

tuple[str, …]

auto_offset_reset

Offset reset policy.

Type:

Literal[‘earliest’, ‘latest’]

poll_timeout_ms

Maximum milliseconds to block waiting for a message on each poll call. Higher values reduce CPU usage when the topic is idle; lower values decrease end-to-end latency. Defaults to 100.

Type:

int

enable_auto_commit

Whether Kafka should auto-commit offsets.

Type:

bool

security

Optional security configuration.

Type:

loom.streaming.kafka._config.KafkaSecuritySettings | None

extra

Optional extra Confluent settings.

Type:

dict[str, str | int | float | bool]

to_confluent_config()[source]

Compile settings to a Confluent-compatible config mapping.

Returns:

String-keyed Confluent configuration mapping.

Raises:

ValueError – If extra redefines a typed configuration key.

Return type:

dict[str, str | int | float | bool]

class loom.streaming.kafka.DecodeError(error, raw, topic, key, headers, partition=None, offset=None, timestamp_ms=None)[source]

Bases: LoomFrozenStruct

Failed Kafka wire decode result with raw dead-letter context.

Parameters:
  • error (ErrorEnvelope[Any]) – Structured WIRE error for DSL error routing.

  • raw (bytes) – Original record bytes that failed to decode.

  • topic (str) – Source Kafka topic.

  • key (bytes | None) – Source Kafka key, normalized to bytes when available.

  • headers (dict[str, bytes]) – Source Kafka headers.

  • partition (int | None) – Source Kafka partition when available.

  • offset (int | None) – Source Kafka offset when available.

  • timestamp_ms (int | None) – Source Kafka timestamp when available.

classmethod loom_message_type()[source]

Return the stable logical message type for wire decode errors.

Return type:

str

class loom.streaming.kafka.DecodeOk(message)[source]

Bases: LoomFrozenStruct, Generic[PayloadT]

Successful Kafka wire decode result.

Parameters:

message (Message[PayloadT]) – Transport-neutral message ready for DSL execution.

class loom.streaming.kafka.DispatchTable(plain, error, wire)[source]

Bases: object

Pre-built decode dispatch table for heterogeneous Kafka topics.

Parameters:
class loom.streaming.kafka.FixedKey(value)[source]

Bases: LoomFrozenStruct

Always use one fixed partition key.

Parameters:

value (bytes)

resolve(record)[source]

Return the fixed key.

Parameters:

record (object) – Kafka record or compatible object. Its payload is ignored.

Returns:

Fixed key bytes.

Return type:

bytes

exception loom.streaming.kafka.KafkaClientError[source]

Bases: Exception

Base error for Kafka transport operations.

class loom.streaming.kafka.KafkaCodec(*args, **kwargs)[source]

Bases: Protocol[PayloadT]

Encode and decode Kafka message envelopes.

encode(message)[source]

Serialize one message envelope to bytes.

Parameters:

message (MessageEnvelope[PayloadT]) – Typed message envelope.

Returns:

Encoded message bytes.

Return type:

bytes

decode(raw, payload_type)[source]

Deserialize bytes to one message envelope.

Parameters:
  • raw (bytes) – Raw Kafka payload bytes.

  • payload_type (type[PayloadT]) – Expected payload model type.

Returns:

Decoded typed message envelope.

Return type:

MessageEnvelope[PayloadT]

exception loom.streaming.kafka.KafkaCommitError[source]

Bases: KafkaConsumerError

Raised when Kafka offset commit fails.

exception loom.streaming.kafka.KafkaConfigurationError[source]

Bases: KafkaClientError

Raised when client configuration is invalid or incomplete.

class loom.streaming.kafka.KafkaConsumer(*args, **kwargs)[source]

Bases: Protocol

Raw Kafka consumer contract.

Returns KafkaRecord[bytes] from Kafka. No codec, no envelope, no deserialization — only raw byte transport.

poll(timeout_ms)[source]

Read one raw byte record.

Parameters:

timeout_ms (int) – Maximum poll wait in milliseconds.

Returns:

One raw Kafka record or None when no record is available.

Return type:

KafkaRecord[bytes] | None

commit(*, asynchronous=False)[source]

Commit consumed offsets.

Parameters:

asynchronous (bool) – Whether the backend may commit asynchronously.

Return type:

None

commit_offset(partitions)[source]

Commit explicit offsets.

Parameters:

partitions (list[TopicPartition]) – Kafka topic-partition offsets to commit.

Return type:

None

close()[source]

Close the consumer and release resources.

Return type:

None

class loom.streaming.kafka.KafkaConsumerClient(settings, obs=None)[source]

Bases: object

Confluent-backed raw Kafka consumer.

Returns KafkaRecord[bytes] from Kafka. No deserialization — values are raw bytes as received from the broker.

Parameters:
  • settings (ConsumerSettings) – Typed consumer settings.

  • observer – Optional observability observer.

  • obs (ObservabilityRuntime | None)

poll(timeout_ms)[source]

Read one raw byte record from Kafka.

Parameters:

timeout_ms (int) – Maximum poll wait in milliseconds.

Returns:

One raw Kafka record or None when no record is available.

Raises:

KafkaPollError – If the backend poll fails or returns a broker error.

Return type:

KafkaRecord[bytes] | None

commit(*, asynchronous=False)[source]

Commit consumed offsets.

Parameters:

asynchronous (bool) – Whether the backend may commit asynchronously.

Raises:

KafkaCommitError – If the backend commit fails.

Return type:

None

commit_offset(partitions)[source]

Commit explicit Kafka offsets.

Parameters:

partitions (list[TopicPartition]) – Kafka topic-partition offsets to commit.

Raises:

KafkaCommitError – If the backend commit fails.

Return type:

None

close()[source]

Close the consumer and release resources.

Return type:

None

exception loom.streaming.kafka.KafkaConsumerError[source]

Bases: KafkaClientError

Base error for consumer-side failures.

exception loom.streaming.kafka.KafkaDeliveryError[source]

Bases: KafkaProducerError

Raised when Kafka delivery fails.

exception loom.streaming.kafka.KafkaDeserializationError[source]

Bases: KafkaConsumerError

Raised when Kafka payload bytes cannot be decoded.

class loom.streaming.kafka.KafkaMessageConsumer(raw, codec, payload_type, obs=None)[source]

Bases: Generic[PayloadT]

Polls raw bytes from a Kafka consumer and decodes them via codec.

All dependencies are injected via constructor — the message consumer depends on the KafkaConsumer and KafkaCodec protocols, not on concrete implementations.

Parameters:
  • raw (KafkaConsumer) – Raw Kafka consumer for byte-level transport.

  • codec (KafkaCodec[PayloadT]) – Codec for decoding bytes to message envelopes.

  • payload_type (type[PayloadT]) – Expected payload model type for decoding.

  • observer – Optional observability observer.

  • obs (ObservabilityRuntime | None)

poll(timeout_ms)[source]

Read and decode one standard message envelope from Kafka.

Parameters:

timeout_ms (int) – Maximum poll wait in milliseconds.

Returns:

One typed record carrying a decoded message envelope, or None when no record is available.

Raises:
Return type:

KafkaRecord[MessageEnvelope[PayloadT]] | None

commit(*, asynchronous=False)[source]

Commit consumed offsets through the raw consumer.

Parameters:

asynchronous (bool) – Whether the backend may commit asynchronously.

Return type:

None

commit_offset(partitions)[source]

Commit explicit offsets through the raw consumer.

Parameters:

partitions (list[TopicPartition]) – Kafka topic-partition offsets to commit.

Return type:

None

close()[source]

Close the consumer and release resources.

Return type:

None

class loom.streaming.kafka.KafkaMessageProducer(raw, codec, key_resolver=None, use_message_timestamp=True, obs=None)[source]

Bases: Generic[PayloadT]

Builds a standard envelope, encodes via codec, delegates to a raw producer.

All dependencies are injected via constructor — the message producer depends on the KafkaProducer and KafkaCodec protocols, not on concrete implementations.

Parameters:
  • raw (KafkaProducer) – Raw Kafka producer for byte-level transport.

  • codec (KafkaCodec[PayloadT]) – Codec for encoding message envelopes to bytes.

  • key_resolver (PartitionKeyResolver[MessageEnvelope[PayloadT]] | None) – Optional resolver used when send does not receive an explicit key.

  • use_message_timestamp (bool) – Whether Kafka records should carry the envelope produced_at_ms timestamp.

  • observer – Optional observability observer.

  • obs (ObservabilityRuntime | None)

send(*, topic, payload, descriptor, key=None, headers=None, correlation_id=None, parent_trace_id=None, causation_id=None, trace_id=None, produced_at_ms=None)[source]

Build, encode, and produce one standard message envelope.

Parameters:
  • topic (str) – Target Kafka topic.

  • payload (PayloadT) – Typed payload body.

  • descriptor (MessageDescriptor) – Stable message descriptor.

  • key (bytes | str | None) – Optional Kafka partition key.

  • headers (dict[str, bytes] | None) – Optional Kafka headers.

  • correlation_id (str | None) – Optional correlation identifier.

  • parent_trace_id (str | None) – Optional upstream trace identifier.

  • causation_id (str | None) – Optional upstream message identifier.

  • trace_id (str | None) – Optional explicit trace identifier.

  • produced_at_ms (int | None) – Optional producer timestamp in epoch milliseconds.

Raises:
Return type:

None

flush(timeout_ms=None)[source]

Flush pending records.

Parameters:

timeout_ms (int | None) – Optional maximum flush wait in milliseconds.

Raises:

KafkaDeliveryError – If flush fails or pending delivery errors remain.

Return type:

None

close()[source]

Flush and close the producer.

Raises:

KafkaDeliveryError – If pending delivery failures remain.

Return type:

None

exception loom.streaming.kafka.KafkaPollError[source]

Bases: KafkaConsumerError

Raised when Kafka polling returns a backend error.

class loom.streaming.kafka.KafkaProducer(*args, **kwargs)[source]

Bases: Protocol

Raw Kafka producer contract.

Sends KafkaRecord[bytes] to Kafka. No codec, no envelope, no serialization — only raw byte transport.

send(record)[source]

Produce one raw byte record.

Parameters:

record (KafkaRecord[bytes]) – Kafka record with a bytes value.

Return type:

None

flush(timeout_ms=None)[source]

Flush pending records.

Parameters:

timeout_ms (int | None) – Optional maximum flush wait in milliseconds.

Return type:

None

close()[source]

Flush and close the producer.

Return type:

None

class loom.streaming.kafka.KafkaProducerClient(settings, delivery_callback=None, obs=None)[source]

Bases: object

Confluent-backed raw Kafka producer.

Sends KafkaRecord[bytes] to Kafka. All values must already be serialized to bytes before calling send().

Parameters:
  • settings (ProducerSettings) – Typed producer settings.

  • delivery_callback (DeliveryCallback | None) – Optional callback notified on delivery success or failure.

  • observer – Optional observability observer.

  • obs (ObservabilityRuntime | None)

send(record)[source]

Produce one raw byte record.

Parameters:

record (KafkaRecord[bytes]) – Kafka record with a bytes value.

Raises:

KafkaDeliveryError – If Kafka rejects the produce call.

Return type:

None

flush(timeout_ms=None)[source]

Flush pending records and materialize delivery failures.

Parameters:

timeout_ms (int | None) – Optional maximum flush wait in milliseconds.

Raises:

KafkaDeliveryError – If flush fails or a pending delivery error exists. Pending delivery errors are consumed when raised, so a later flush call will not raise the same error again.

Return type:

None

close()[source]

Flush and close the producer.

Raises:

KafkaDeliveryError – If pending delivery failures remain.

Return type:

None

exception loom.streaming.kafka.KafkaProducerError[source]

Bases: KafkaClientError

Base error for producer-side failures.

class loom.streaming.kafka.KafkaRecord(topic, key, value, headers=<factory>, partition=None, offset=None, timestamp_ms=None)[source]

Bases: LoomFrozenStruct, Generic[PayloadT]

Typed Kafka transport record.

Parameters:
topic

Kafka topic name.

Type:

str

key

Transport key used for partitioning.

Type:

bytes | str | None

value

Typed record value.

Type:

loom.streaming.kafka._record.PayloadT

headers

Kafka headers.

Type:

dict[str, bytes]

partition

Partition number when known.

Type:

int | None

offset

Offset when known.

Type:

int | None

timestamp_ms

Broker or producer timestamp in epoch milliseconds.

Type:

int | None

class loom.streaming.kafka.KafkaSecuritySettings(*, protocol, sasl_mechanism=None, sasl_username=None, sasl_password=None, ssl_ca_location=None)[source]

Bases: LoomFrozenStruct

Optional Kafka security settings.

Parameters:
  • protocol (Literal['PLAINTEXT', 'SSL', 'SASL_SSL', 'SASL_PLAINTEXT'])

  • sasl_mechanism (str | None)

  • sasl_username (str | None)

  • sasl_password (str | None)

  • ssl_ca_location (str | None)

protocol

Kafka security protocol.

Type:

Literal[‘PLAINTEXT’, ‘SSL’, ‘SASL_SSL’, ‘SASL_PLAINTEXT’]

sasl_mechanism

Optional SASL mechanism.

Type:

str | None

sasl_username

Optional SASL username.

Type:

str | None

sasl_password

Optional SASL password.

Type:

str | None

ssl_ca_location

Optional CA file path.

Type:

str | None

to_confluent_config()[source]

Compile security settings to Confluent-compatible keys.

Returns:

String-keyed Confluent security configuration mapping.

Return type:

dict[str, str | int | float | bool]

class loom.streaming.kafka.KafkaSettings(*, producer=None, consumer=None, producers=<factory>, consumers=<factory>)[source]

Bases: LoomFrozenStruct

Typed Kafka settings loaded from a YAML config section.

Parameters:
producer_for(ref)[source]

Resolve producer settings by logical reference with default fallback.

Parameters:

ref (str | LogicalRef) – Logical output reference.

Returns:

Specific producer settings when present, otherwise the common producer settings.

Raises:

KeyError – If neither a specific nor common producer is configured.

Return type:

ProducerSettings

consumer_for(ref)[source]

Resolve consumer settings by logical reference with default fallback.

Parameters:

ref (str | LogicalRef) – Logical input reference.

Returns:

Specific consumer settings when present, otherwise the common consumer settings.

Raises:

KeyError – If neither a specific nor common consumer is configured.

Return type:

ConsumerSettings

exception loom.streaming.kafka.KafkaSerializationError[source]

Bases: KafkaProducerError

Raised when a record cannot be serialized for Kafka output.

class loom.streaming.kafka.MessageConsumer(*args, **kwargs)[source]

Bases: Protocol[PayloadT]

Typed message consumer contract.

Polls raw bytes from a Kafka consumer and decodes them into typed message envelopes via codec.

poll(timeout_ms)[source]

Read and decode one standard message envelope.

Parameters:

timeout_ms (int) – Maximum poll wait in milliseconds.

Returns:

One typed record carrying a decoded message envelope, or None when no record is available.

Return type:

KafkaRecord[MessageEnvelope[PayloadT]] | None

commit(*, asynchronous=False)[source]

Commit consumed offsets.

Parameters:

asynchronous (bool) – Whether the backend may commit asynchronously.

Return type:

None

commit_offset(partitions)[source]

Commit explicit offsets.

Parameters:

partitions (list[TopicPartition]) – Kafka topic-partition offsets to commit.

Return type:

None

close()[source]

Close the consumer and release resources.

Return type:

None

class loom.streaming.kafka.MessageDescriptor(message_type, message_version, content_type=<factory>, schema_ref=None)[source]

Bases: LoomFrozenStruct

Describe the stable identity of a Kafka message contract.

Parameters:
message_type

Stable logical message type name.

Type:

str

message_version

Logical message version.

Type:

int

content_type

Wire content descriptor.

Type:

loom.streaming.kafka._message.ContentType

schema_ref

Optional schema reference.

Type:

loom.streaming.kafka._message.SchemaRef | None

class loom.streaming.kafka.MessageEnvelope(meta, payload)[source]

Bases: LoomFrozenStruct, Generic[PayloadT]

Standard Kafka message envelope for Loom streaming.

Parameters:
meta

Envelope metadata.

Type:

loom.streaming.kafka._message.MessageMetadata

payload

Typed payload body.

Type:

loom.streaming.kafka._message.PayloadT

class loom.streaming.kafka.MessageMetadata(descriptor, trace_id=None, parent_trace_id=None, correlation_id=None, causation_id=None, produced_at_ms=<factory>)[source]

Bases: LoomFrozenStruct

Transport metadata for the standard Kafka message envelope.

Parameters:
descriptor

Stable message contract descriptor.

Type:

loom.streaming.kafka._message.MessageDescriptor

trace_id

Trace identifier propagated across process boundaries.

Type:

str | None

parent_trace_id

Optional upstream trace identifier.

Type:

str | None

correlation_id

Correlation identifier shared across related messages.

Type:

str | None

causation_id

Optional upstream message identifier.

Type:

str | None

produced_at_ms

Producer timestamp in epoch milliseconds.

Type:

int

class loom.streaming.kafka.MessageProducer(*args, **kwargs)[source]

Bases: Protocol[PayloadContraT]

Typed message producer contract.

Builds a standard envelope, encodes via codec, and delegates to a raw Kafka producer.

send(*, topic, payload, descriptor, key=None, headers=None, correlation_id=None, parent_trace_id=None, causation_id=None, trace_id=None, produced_at_ms=None)[source]

Build, encode, and produce one standard message envelope.

Parameters:
  • topic (str) – Target Kafka topic.

  • payload (PayloadContraT) – Typed payload body.

  • descriptor (MessageDescriptor) – Stable message descriptor.

  • key (bytes | str | None) – Optional Kafka partition key.

  • headers (dict[str, bytes] | None) – Optional Kafka headers.

  • correlation_id (str | None) – Optional correlation identifier.

  • parent_trace_id (str | None) – Optional upstream trace identifier.

  • causation_id (str | None) – Optional upstream message identifier.

  • trace_id (str | None) – Optional explicit trace identifier.

  • produced_at_ms (int | None) – Optional producer timestamp in epoch milliseconds.

Return type:

None

flush(timeout_ms=None)[source]

Flush pending records.

Parameters:

timeout_ms (int | None) – Optional maximum flush wait in milliseconds.

Return type:

None

close()[source]

Flush and close the producer.

Return type:

None

class loom.streaming.kafka.MsgspecCodec[source]

Bases: Generic[PayloadT]

Direct MessagePack codec for Kafka message envelopes.

encode(message)[source]

Serialize one typed message envelope.

Parameters:

message (MessageEnvelope[PayloadT]) – Typed message envelope.

Returns:

MessagePack bytes.

Return type:

bytes

decode(raw, payload_type)[source]

Deserialize one typed message envelope.

Parameters:
  • raw (bytes) – Raw MessagePack bytes.

  • payload_type (type[PayloadT]) – Expected payload model type.

Returns:

Decoded typed message envelope.

Return type:

MessageEnvelope[PayloadT]

class loom.streaming.kafka.PartitionKeyResolver(*args, **kwargs)[source]

Bases: Protocol[PayloadT]

Compute the Kafka partition key for an outgoing record.

resolve(record)[source]

Return Kafka partition-key bytes.

Parameters:

record (KafkaRecord[PayloadT]) – Typed Kafka record to inspect.

Returns:

Partition-key bytes or None for broker-controlled routing.

Return type:

bytes | None

class loom.streaming.kafka.PreserveKey[source]

Bases: Generic[PayloadT]

Preserve an existing record key when present.

resolve(record)[source]

Return the existing record key encoded as bytes.

Parameters:

record (KafkaRecord[PayloadT]) – Typed Kafka record.

Returns:

Existing key bytes or None.

Return type:

bytes | None

class loom.streaming.kafka.ProducerSettings(*, brokers, client_id=None, topic=None, security=None, extra=<factory>)[source]

Bases: LoomFrozenStruct

Typed Kafka producer settings.

Parameters:
brokers

Kafka broker addresses.

Type:

tuple[str, …]

client_id

Producer client identifier.

Type:

str | None

topic

Optional default physical topic for a logical output.

Type:

str | None

security

Optional security configuration.

Type:

loom.streaming.kafka._config.KafkaSecuritySettings | None

extra

Optional extra Confluent settings.

Type:

dict[str, str | int | float | bool]

to_confluent_config()[source]

Compile settings to a Confluent-compatible config mapping.

Returns:

String-keyed Confluent configuration mapping.

Raises:

ValueError – If extra redefines a typed configuration key.

Return type:

dict[str, str | int | float | bool]

loom.streaming.kafka.resolve_consumer_topics(ref, settings)[source]

Resolve physical consumer topics from settings or logical fallback.

Parameters:
  • ref (str | LogicalRef) – Logical input reference used as fallback only when the config does not define topics.

  • settings (ConsumerSettings) – Resolved consumer settings.

Returns:

Physical topic names.

Return type:

tuple[str, …]

loom.streaming.kafka.resolve_producer_topic(ref, settings)[source]

Resolve physical producer topic from settings or logical fallback.

Parameters:
  • ref (str | LogicalRef) – Logical output reference used as fallback only when the config does not define topic.

  • settings (ProducerSettings) – Resolved producer settings.

Returns:

Physical topic name.

Return type:

str

class loom.streaming.kafka.SchemaRef(namespace, name, version, format)[source]

Bases: LoomFrozenStruct

Reference a payload schema contract without forcing one registry.

Parameters:
namespace

Schema authority or logical owner.

Type:

str

name

Stable schema or message name.

Type:

str

version

Schema version identifier.

Type:

str

format

Schema representation identifier.

Type:

str

loom.streaming.kafka.build_message(payload, descriptor, *, correlation_id=None, parent_trace_id=None, causation_id=None, trace_id=None, produced_at_ms=None)[source]

Build the standard Kafka message envelope.

The caller usually provides only the payload and descriptor. Trace context is taken from the active Loom tracing context when not supplied; when none is active, a fresh trace identifier is generated so the envelope still participates in lineage.

Parameters:
  • payload (PayloadT) – Typed message payload.

  • descriptor (MessageDescriptor) – Stable message contract descriptor.

  • correlation_id (str | None) – Optional correlation identifier.

  • parent_trace_id (str | None) – Optional upstream trace identifier.

  • causation_id (str | None) – Optional upstream message identifier.

  • trace_id (str | None) – Optional explicit trace identifier.

  • produced_at_ms (int | None) – Optional producer timestamp in epoch milliseconds.

Returns:

A typed message envelope ready for serialization.

Return type:

MessageEnvelope[PayloadT]

loom.streaming.kafka.envelope_to_message(envelope, record)[source]

Convert a Kafka wire envelope and record context to a DSL message.

Parameters:
Returns:

Transport-neutral streaming message with envelope metadata preserved where it can influence DSL routing or user logic. If the envelope has no trace identifier, a fresh one is generated so the message still participates in lineage.

Return type:

Message[PayloadT]

loom.streaming.kafka.load_kafka_settings(*config_files, section_name='kafka', resolvers=())[source]

Load Kafka settings from YAML using the shared core config loader.

Parameters:
  • *config_files (str) – One or more local paths or cloud URIs.

  • section_name (str) – Dot-separated config section path. Defaults to "kafka".

  • resolvers (Sequence[ConfigResolver]) – Optional core config resolvers for custom placeholders.

Returns:

Validated Kafka settings.

Raises:

loom.core.config.ConfigError – If files cannot be loaded, the section is missing, or the section fails validation.

Return type:

KafkaSettings

loom.streaming.kafka.try_decode_multi_record(record, dispatch, codec)[source]

Decode one Kafka record from a heterogeneous topic using a dispatch table.

Uses an exact-type dispatch strategy:

  1. Plain payload envelopes dispatch by MessageEnvelope.meta.descriptor.message_type.

  2. Business/task/routing error envelopes dispatch by their outer error kind type and inner ErrorEnvelope.payload_type.

  3. Wire-decode payloads dispatch by the dedicated wire error type.

Unknown message_type values and decode failures both produce a DecodeError with ErrorKind.WIRE.

Parameters:
  • record (KafkaRecord[bytes]) – Raw Kafka record from a heterogeneous topic.

  • dispatch (DispatchTable) – Pre-built dispatch table keyed by message_type and error payload_type strings.

  • codec (KafkaCodec[Any]) – Codec used for full envelope decoding.

Returns:

DecodeOk on success, DecodeError on probe or decode failure.

Return type:

DecodeOk[Any] | DecodeError

loom.streaming.kafka.try_decode_record(record, payload_type, codec)[source]

Decode one Kafka record to a DSL message without raising decode errors.

Parameters:
  • record (KafkaRecord[bytes]) – Raw Kafka record whose value contains a Loom message envelope.

  • payload_type (type[PayloadT]) – Expected payload model type.

  • codec (KafkaCodec[PayloadT]) – Codec used to decode the envelope bytes.

Returns:

DecodeOk when decoding succeeds, otherwise DecodeError carrying the original raw bytes and Kafka context needed by a DLQ sink.

Return type:

DecodeOk[PayloadT] | DecodeError