loom.streaming.kafka¶
Kafka transport contracts for Loom streaming.
- class loom.streaming.kafka.ContentType(media_type, encoding=None)[source]¶
Bases:
LoomFrozenStructDescribe the logical wire content type of one Kafka payload.
- classmethod msgpack()[source]¶
Return the default Loom MessagePack content type.
- Returns:
Standard MessagePack content-type descriptor for Loom-native payloads.
- Return type:
- class loom.streaming.kafka.ConsumerSettings(*, brokers, group_id, topics, auto_offset_reset='earliest', poll_timeout_ms=100, enable_auto_commit=True, security=None, extra=<factory>)[source]¶
Bases:
LoomFrozenStructTyped Kafka consumer settings.
- Parameters:
- auto_offset_reset¶
Offset reset policy.
- Type:
Literal[‘earliest’, ‘latest’]
- poll_timeout_ms¶
Maximum milliseconds to block waiting for a message on each poll call. Higher values reduce CPU usage when the topic is idle; lower values decrease end-to-end latency. Defaults to 100.
- Type:
- security¶
Optional security configuration.
- Type:
- class loom.streaming.kafka.DecodeError(error, raw, topic, key, headers, partition=None, offset=None, timestamp_ms=None)[source]¶
Bases:
LoomFrozenStructFailed Kafka wire decode result with raw dead-letter context.
- Parameters:
error (ErrorEnvelope[Any]) – Structured WIRE error for DSL error routing.
raw (bytes) – Original record bytes that failed to decode.
topic (str) – Source Kafka topic.
key (bytes | None) – Source Kafka key, normalized to bytes when available.
partition (int | None) – Source Kafka partition when available.
offset (int | None) – Source Kafka offset when available.
timestamp_ms (int | None) – Source Kafka timestamp when available.
- class loom.streaming.kafka.DecodeOk(message)[source]¶
Bases:
LoomFrozenStruct,Generic[PayloadT]Successful Kafka wire decode result.
- Parameters:
message (Message[PayloadT]) – Transport-neutral message ready for DSL execution.
- class loom.streaming.kafka.DispatchTable(plain, error, wire)[source]¶
Bases:
objectPre-built decode dispatch table for heterogeneous Kafka topics.
- Parameters:
plain (Mapping[str, type[LoomStruct | LoomFrozenStruct]]) – Maps outer
message_typestrings to their payload types.error (Mapping[str, Any]) – Maps inner
ErrorEnvelope.payload_typestrings to the correspondingErrorEnvelope[T]generic alias.wire (Mapping[str, type[LoomStruct | LoomFrozenStruct]]) – Maps outer wire-error
message_typestrings to their payload types.
- class loom.streaming.kafka.FixedKey(value)[source]¶
Bases:
LoomFrozenStructAlways use one fixed partition key.
- Parameters:
value (bytes)
- exception loom.streaming.kafka.KafkaClientError[source]¶
Bases:
ExceptionBase error for Kafka transport operations.
- class loom.streaming.kafka.KafkaCodec(*args, **kwargs)[source]¶
Bases:
Protocol[PayloadT]Encode and decode Kafka message envelopes.
- encode(message)[source]¶
Serialize one message envelope to bytes.
- Parameters:
message (MessageEnvelope[PayloadT]) – Typed message envelope.
- Returns:
Encoded message bytes.
- Return type:
- decode(raw, payload_type)[source]¶
Deserialize bytes to one message envelope.
- Parameters:
- Returns:
Decoded typed message envelope.
- Return type:
MessageEnvelope[PayloadT]
- exception loom.streaming.kafka.KafkaCommitError[source]¶
Bases:
KafkaConsumerErrorRaised when Kafka offset commit fails.
- exception loom.streaming.kafka.KafkaConfigurationError[source]¶
Bases:
KafkaClientErrorRaised when client configuration is invalid or incomplete.
- class loom.streaming.kafka.KafkaConsumer(*args, **kwargs)[source]¶
Bases:
ProtocolRaw Kafka consumer contract.
Returns
KafkaRecord[bytes]from Kafka. No codec, no envelope, no deserialization — only raw byte transport.- poll(timeout_ms)[source]¶
Read one raw byte record.
- Parameters:
timeout_ms (int) – Maximum poll wait in milliseconds.
- Returns:
One raw Kafka record or
Nonewhen no record is available.- Return type:
KafkaRecord[bytes] | None
- commit(*, asynchronous=False)[source]¶
Commit consumed offsets.
- Parameters:
asynchronous (bool) – Whether the backend may commit asynchronously.
- Return type:
None
- class loom.streaming.kafka.KafkaConsumerClient(settings, obs=None)[source]¶
Bases:
objectConfluent-backed raw Kafka consumer.
Returns
KafkaRecord[bytes]from Kafka. No deserialization — values are raw bytes as received from the broker.- Parameters:
settings (ConsumerSettings) – Typed consumer settings.
observer – Optional observability observer.
obs (ObservabilityRuntime | None)
- poll(timeout_ms)[source]¶
Read one raw byte record from Kafka.
- Parameters:
timeout_ms (int) – Maximum poll wait in milliseconds.
- Returns:
One raw Kafka record or
Nonewhen no record is available.- Raises:
KafkaPollError – If the backend poll fails or returns a broker error.
- Return type:
KafkaRecord[bytes] | None
- commit(*, asynchronous=False)[source]¶
Commit consumed offsets.
- Parameters:
asynchronous (bool) – Whether the backend may commit asynchronously.
- Raises:
KafkaCommitError – If the backend commit fails.
- Return type:
None
- commit_offset(partitions)[source]¶
Commit explicit Kafka offsets.
- Parameters:
partitions (list[TopicPartition]) – Kafka topic-partition offsets to commit.
- Raises:
KafkaCommitError – If the backend commit fails.
- Return type:
None
- exception loom.streaming.kafka.KafkaConsumerError[source]¶
Bases:
KafkaClientErrorBase error for consumer-side failures.
- exception loom.streaming.kafka.KafkaDeliveryError[source]¶
Bases:
KafkaProducerErrorRaised when Kafka delivery fails.
- exception loom.streaming.kafka.KafkaDeserializationError[source]¶
Bases:
KafkaConsumerErrorRaised when Kafka payload bytes cannot be decoded.
- class loom.streaming.kafka.KafkaMessageConsumer(raw, codec, payload_type, obs=None)[source]¶
Bases:
Generic[PayloadT]Polls raw bytes from a Kafka consumer and decodes them via codec.
All dependencies are injected via constructor — the message consumer depends on the
KafkaConsumerandKafkaCodecprotocols, not on concrete implementations.- Parameters:
raw (KafkaConsumer) – Raw Kafka consumer for byte-level transport.
codec (KafkaCodec[PayloadT]) – Codec for decoding bytes to message envelopes.
payload_type (type[PayloadT]) – Expected payload model type for decoding.
observer – Optional observability observer.
obs (ObservabilityRuntime | None)
- poll(timeout_ms)[source]¶
Read and decode one standard message envelope from Kafka.
- Parameters:
timeout_ms (int) – Maximum poll wait in milliseconds.
- Returns:
One typed record carrying a decoded message envelope, or
Nonewhen no record is available.- Raises:
KafkaPollError – If the backend poll fails.
KafkaDeserializationError – If the envelope cannot be decoded.
- Return type:
KafkaRecord[MessageEnvelope[PayloadT]] | None
- commit(*, asynchronous=False)[source]¶
Commit consumed offsets through the raw consumer.
- Parameters:
asynchronous (bool) – Whether the backend may commit asynchronously.
- Return type:
None
- class loom.streaming.kafka.KafkaMessageProducer(raw, codec, key_resolver=None, use_message_timestamp=True, obs=None)[source]¶
Bases:
Generic[PayloadT]Builds a standard envelope, encodes via codec, delegates to a raw producer.
All dependencies are injected via constructor — the message producer depends on the
KafkaProducerandKafkaCodecprotocols, not on concrete implementations.- Parameters:
raw (KafkaProducer) – Raw Kafka producer for byte-level transport.
codec (KafkaCodec[PayloadT]) – Codec for encoding message envelopes to bytes.
key_resolver (PartitionKeyResolver[MessageEnvelope[PayloadT]] | None) – Optional resolver used when
senddoes not receive an explicit key.use_message_timestamp (bool) – Whether Kafka records should carry the envelope
produced_at_mstimestamp.observer – Optional observability observer.
obs (ObservabilityRuntime | None)
- send(*, topic, payload, descriptor, key=None, headers=None, correlation_id=None, parent_trace_id=None, causation_id=None, trace_id=None, produced_at_ms=None)[source]¶
Build, encode, and produce one standard message envelope.
- Parameters:
topic (str) – Target Kafka topic.
payload (PayloadT) – Typed payload body.
descriptor (MessageDescriptor) – Stable message descriptor.
correlation_id (str | None) – Optional correlation identifier.
parent_trace_id (str | None) – Optional upstream trace identifier.
causation_id (str | None) – Optional upstream message identifier.
trace_id (str | None) – Optional explicit trace identifier.
produced_at_ms (int | None) – Optional producer timestamp in epoch milliseconds.
- Raises:
KafkaSerializationError – If the envelope cannot be encoded.
KafkaDeliveryError – If Kafka rejects the produce call.
- Return type:
None
- flush(timeout_ms=None)[source]¶
Flush pending records.
- Parameters:
timeout_ms (int | None) – Optional maximum flush wait in milliseconds.
- Raises:
KafkaDeliveryError – If flush fails or pending delivery errors remain.
- Return type:
None
- close()[source]¶
Flush and close the producer.
- Raises:
KafkaDeliveryError – If pending delivery failures remain.
- Return type:
None
- exception loom.streaming.kafka.KafkaPollError[source]¶
Bases:
KafkaConsumerErrorRaised when Kafka polling returns a backend error.
- class loom.streaming.kafka.KafkaProducer(*args, **kwargs)[source]¶
Bases:
ProtocolRaw Kafka producer contract.
Sends
KafkaRecord[bytes]to Kafka. No codec, no envelope, no serialization — only raw byte transport.- send(record)[source]¶
Produce one raw byte record.
- Parameters:
record (KafkaRecord[bytes]) – Kafka record with a
bytesvalue.- Return type:
None
- class loom.streaming.kafka.KafkaProducerClient(settings, delivery_callback=None, obs=None)[source]¶
Bases:
objectConfluent-backed raw Kafka producer.
Sends
KafkaRecord[bytes]to Kafka. All values must already be serialized to bytes before callingsend().- Parameters:
settings (ProducerSettings) – Typed producer settings.
delivery_callback (DeliveryCallback | None) – Optional callback notified on delivery success or failure.
observer – Optional observability observer.
obs (ObservabilityRuntime | None)
- send(record)[source]¶
Produce one raw byte record.
- Parameters:
record (KafkaRecord[bytes]) – Kafka record with a
bytesvalue.- Raises:
KafkaDeliveryError – If Kafka rejects the produce call.
- Return type:
None
- flush(timeout_ms=None)[source]¶
Flush pending records and materialize delivery failures.
- Parameters:
timeout_ms (int | None) – Optional maximum flush wait in milliseconds.
- Raises:
KafkaDeliveryError – If flush fails or a pending delivery error exists. Pending delivery errors are consumed when raised, so a later
flushcall will not raise the same error again.- Return type:
None
- close()[source]¶
Flush and close the producer.
- Raises:
KafkaDeliveryError – If pending delivery failures remain.
- Return type:
None
- exception loom.streaming.kafka.KafkaProducerError[source]¶
Bases:
KafkaClientErrorBase error for producer-side failures.
- class loom.streaming.kafka.KafkaRecord(topic, key, value, headers=<factory>, partition=None, offset=None, timestamp_ms=None)[source]¶
Bases:
LoomFrozenStruct,Generic[PayloadT]Typed Kafka transport record.
- Parameters:
- value¶
Typed record value.
- Type:
loom.streaming.kafka._record.PayloadT
- class loom.streaming.kafka.KafkaSecuritySettings(*, protocol, sasl_mechanism=None, sasl_username=None, sasl_password=None, ssl_ca_location=None)[source]¶
Bases:
LoomFrozenStructOptional Kafka security settings.
- Parameters:
- protocol¶
Kafka security protocol.
- Type:
Literal[‘PLAINTEXT’, ‘SSL’, ‘SASL_SSL’, ‘SASL_PLAINTEXT’]
- class loom.streaming.kafka.KafkaSettings(*, producer=None, consumer=None, producers=<factory>, consumers=<factory>)[source]¶
Bases:
LoomFrozenStructTyped Kafka settings loaded from a YAML config section.
- Parameters:
producer (ProducerSettings | None) – Optional producer settings.
consumer (ConsumerSettings | None) – Optional consumer settings.
producers (dict[str, ProducerSettings]) – Named producer settings keyed by logical output reference.
consumers (dict[str, ConsumerSettings]) – Named consumer settings keyed by logical input reference.
- producer_for(ref)[source]¶
Resolve producer settings by logical reference with default fallback.
- consumer_for(ref)[source]¶
Resolve consumer settings by logical reference with default fallback.
- exception loom.streaming.kafka.KafkaSerializationError[source]¶
Bases:
KafkaProducerErrorRaised when a record cannot be serialized for Kafka output.
- class loom.streaming.kafka.MessageConsumer(*args, **kwargs)[source]¶
Bases:
Protocol[PayloadT]Typed message consumer contract.
Polls raw bytes from a Kafka consumer and decodes them into typed message envelopes via codec.
- poll(timeout_ms)[source]¶
Read and decode one standard message envelope.
- Parameters:
timeout_ms (int) – Maximum poll wait in milliseconds.
- Returns:
One typed record carrying a decoded message envelope, or
Nonewhen no record is available.- Return type:
KafkaRecord[MessageEnvelope[PayloadT]] | None
- commit(*, asynchronous=False)[source]¶
Commit consumed offsets.
- Parameters:
asynchronous (bool) – Whether the backend may commit asynchronously.
- Return type:
None
- class loom.streaming.kafka.MessageDescriptor(message_type, message_version, content_type=<factory>, schema_ref=None)[source]¶
Bases:
LoomFrozenStructDescribe the stable identity of a Kafka message contract.
- Parameters:
message_type (str)
message_version (int)
content_type (ContentType)
schema_ref (SchemaRef | None)
- content_type¶
Wire content descriptor.
- schema_ref¶
Optional schema reference.
- Type:
- class loom.streaming.kafka.MessageEnvelope(meta, payload)[source]¶
Bases:
LoomFrozenStruct,Generic[PayloadT]Standard Kafka message envelope for Loom streaming.
- Parameters:
meta (MessageMetadata)
payload (PayloadT)
- meta¶
Envelope metadata.
- payload¶
Typed payload body.
- Type:
loom.streaming.kafka._message.PayloadT
- class loom.streaming.kafka.MessageMetadata(descriptor, trace_id=None, parent_trace_id=None, correlation_id=None, causation_id=None, produced_at_ms=<factory>)[source]¶
Bases:
LoomFrozenStructTransport metadata for the standard Kafka message envelope.
- Parameters:
- descriptor¶
Stable message contract descriptor.
- class loom.streaming.kafka.MessageProducer(*args, **kwargs)[source]¶
Bases:
Protocol[PayloadContraT]Typed message producer contract.
Builds a standard envelope, encodes via codec, and delegates to a raw Kafka producer.
- send(*, topic, payload, descriptor, key=None, headers=None, correlation_id=None, parent_trace_id=None, causation_id=None, trace_id=None, produced_at_ms=None)[source]¶
Build, encode, and produce one standard message envelope.
- Parameters:
topic (str) – Target Kafka topic.
payload (PayloadContraT) – Typed payload body.
descriptor (MessageDescriptor) – Stable message descriptor.
correlation_id (str | None) – Optional correlation identifier.
parent_trace_id (str | None) – Optional upstream trace identifier.
causation_id (str | None) – Optional upstream message identifier.
trace_id (str | None) – Optional explicit trace identifier.
produced_at_ms (int | None) – Optional producer timestamp in epoch milliseconds.
- Return type:
None
- class loom.streaming.kafka.MsgspecCodec[source]¶
Bases:
Generic[PayloadT]Direct MessagePack codec for Kafka message envelopes.
- encode(message)[source]¶
Serialize one typed message envelope.
- Parameters:
message (MessageEnvelope[PayloadT]) – Typed message envelope.
- Returns:
MessagePack bytes.
- Return type:
- decode(raw, payload_type)[source]¶
Deserialize one typed message envelope.
- Parameters:
- Returns:
Decoded typed message envelope.
- Return type:
MessageEnvelope[PayloadT]
- class loom.streaming.kafka.PartitionKeyResolver(*args, **kwargs)[source]¶
Bases:
Protocol[PayloadT]Compute the Kafka partition key for an outgoing record.
- resolve(record)[source]¶
Return Kafka partition-key bytes.
- Parameters:
record (KafkaRecord[PayloadT]) – Typed Kafka record to inspect.
- Returns:
Partition-key bytes or
Nonefor broker-controlled routing.- Return type:
bytes | None
- class loom.streaming.kafka.PreserveKey[source]¶
Bases:
Generic[PayloadT]Preserve an existing record key when present.
- resolve(record)[source]¶
Return the existing record key encoded as bytes.
- Parameters:
record (KafkaRecord[PayloadT]) – Typed Kafka record.
- Returns:
Existing key bytes or
None.- Return type:
bytes | None
- class loom.streaming.kafka.ProducerSettings(*, brokers, client_id=None, topic=None, security=None, extra=<factory>)[source]¶
Bases:
LoomFrozenStructTyped Kafka producer settings.
- Parameters:
- security¶
Optional security configuration.
- Type:
- loom.streaming.kafka.resolve_consumer_topics(ref, settings)[source]¶
Resolve physical consumer topics from settings or logical fallback.
- Parameters:
ref (str | LogicalRef) – Logical input reference used as fallback only when the config does not define
topics.settings (ConsumerSettings) – Resolved consumer settings.
- Returns:
Physical topic names.
- Return type:
- loom.streaming.kafka.resolve_producer_topic(ref, settings)[source]¶
Resolve physical producer topic from settings or logical fallback.
- Parameters:
ref (str | LogicalRef) – Logical output reference used as fallback only when the config does not define
topic.settings (ProducerSettings) – Resolved producer settings.
- Returns:
Physical topic name.
- Return type:
- class loom.streaming.kafka.SchemaRef(namespace, name, version, format)[source]¶
Bases:
LoomFrozenStructReference a payload schema contract without forcing one registry.
- loom.streaming.kafka.build_message(payload, descriptor, *, correlation_id=None, parent_trace_id=None, causation_id=None, trace_id=None, produced_at_ms=None)[source]¶
Build the standard Kafka message envelope.
The caller usually provides only the payload and descriptor. Trace context is taken from the active Loom tracing context when not supplied; when none is active, a fresh trace identifier is generated so the envelope still participates in lineage.
- Parameters:
payload (PayloadT) – Typed message payload.
descriptor (MessageDescriptor) – Stable message contract descriptor.
correlation_id (str | None) – Optional correlation identifier.
parent_trace_id (str | None) – Optional upstream trace identifier.
causation_id (str | None) – Optional upstream message identifier.
trace_id (str | None) – Optional explicit trace identifier.
produced_at_ms (int | None) – Optional producer timestamp in epoch milliseconds.
- Returns:
A typed message envelope ready for serialization.
- Return type:
MessageEnvelope[PayloadT]
- loom.streaming.kafka.envelope_to_message(envelope, record)[source]¶
Convert a Kafka wire envelope and record context to a DSL message.
- Parameters:
envelope (MessageEnvelope[PayloadT]) – Decoded standard Kafka message envelope.
record (KafkaRecord[bytes]) – Original Kafka transport record.
- Returns:
Transport-neutral streaming message with envelope metadata preserved where it can influence DSL routing or user logic. If the envelope has no trace identifier, a fresh one is generated so the message still participates in lineage.
- Return type:
Message[PayloadT]
- loom.streaming.kafka.load_kafka_settings(*config_files, section_name='kafka', resolvers=())[source]¶
Load Kafka settings from YAML using the shared core config loader.
- Parameters:
*config_files (str) – One or more local paths or cloud URIs.
section_name (str) – Dot-separated config section path. Defaults to
"kafka".resolvers (Sequence[ConfigResolver]) – Optional core config resolvers for custom placeholders.
- Returns:
Validated Kafka settings.
- Raises:
loom.core.config.ConfigError – If files cannot be loaded, the section is missing, or the section fails validation.
- Return type:
- loom.streaming.kafka.try_decode_multi_record(record, dispatch, codec)[source]¶
Decode one Kafka record from a heterogeneous topic using a dispatch table.
Uses an exact-type dispatch strategy:
Plain payload envelopes dispatch by
MessageEnvelope.meta.descriptor.message_type.Business/task/routing error envelopes dispatch by their outer error kind type and inner
ErrorEnvelope.payload_type.Wire-decode payloads dispatch by the dedicated wire error type.
Unknown
message_typevalues and decode failures both produce aDecodeErrorwithErrorKind.WIRE.- Parameters:
record (KafkaRecord[bytes]) – Raw Kafka record from a heterogeneous topic.
dispatch (DispatchTable) – Pre-built dispatch table keyed by
message_typeand errorpayload_typestrings.codec (KafkaCodec[Any]) – Codec used for full envelope decoding.
- Returns:
DecodeOkon success,DecodeErroron probe or decode failure.- Return type:
- loom.streaming.kafka.try_decode_record(record, payload_type, codec)[source]¶
Decode one Kafka record to a DSL message without raising decode errors.
- Parameters:
record (KafkaRecord[bytes]) – Raw Kafka record whose value contains a Loom message envelope.
payload_type (type[PayloadT]) – Expected payload model type.
codec (KafkaCodec[PayloadT]) – Codec used to decode the envelope bytes.
- Returns:
DecodeOkwhen decoding succeeds, otherwiseDecodeErrorcarrying the original raw bytes and Kafka context needed by a DLQ sink.- Return type:
DecodeOk[PayloadT] | DecodeError