Source code for loom.streaming.kafka._config

"""Typed Kafka configuration contracts."""

from __future__ import annotations

from collections.abc import Sequence
from typing import Literal, TypeAlias

import msgspec

from loom.core.config import ConfigResolver, load_config, section
from loom.core.model import LoomFrozenStruct
from loom.core.routing import DefaultingRouteResolver, LogicalRef

KafkaConfigValue: TypeAlias = str | int | float | bool


[docs] class KafkaSecuritySettings(LoomFrozenStruct, frozen=True, kw_only=True): """Optional Kafka security settings. Attributes: protocol: Kafka security protocol. sasl_mechanism: Optional SASL mechanism. sasl_username: Optional SASL username. sasl_password: Optional SASL password. ssl_ca_location: Optional CA file path. """ protocol: Literal["PLAINTEXT", "SSL", "SASL_SSL", "SASL_PLAINTEXT"] sasl_mechanism: str | None = None sasl_username: str | None = None sasl_password: str | None = None ssl_ca_location: str | None = None
[docs] def to_confluent_config(self) -> dict[str, KafkaConfigValue]: """Compile security settings to Confluent-compatible keys. Returns: String-keyed Confluent security configuration mapping. """ return _without_none( { "security.protocol": self.protocol, "sasl.mechanism": self.sasl_mechanism, "sasl.username": self.sasl_username, "sasl.password": self.sasl_password, "ssl.ca.location": self.ssl_ca_location, } )
[docs] class ProducerSettings(LoomFrozenStruct, frozen=True, kw_only=True): """Typed Kafka producer settings. Attributes: brokers: Kafka broker addresses. client_id: Producer client identifier. topic: Optional default physical topic for a logical output. security: Optional security configuration. extra: Optional extra Confluent settings. """ brokers: tuple[str, ...] client_id: str | None = None topic: str | None = None security: KafkaSecuritySettings | None = None extra: dict[str, KafkaConfigValue] = msgspec.field(default_factory=dict)
[docs] def to_confluent_config(self) -> dict[str, KafkaConfigValue]: """Compile settings to a Confluent-compatible config mapping. Returns: String-keyed Confluent configuration mapping. Raises: ValueError: If ``extra`` redefines a typed configuration key. """ managed = { "bootstrap.servers": _broker_list(self.brokers), **_without_none({"client.id": self.client_id}), **_security_config(self.security), } return _merge_extra_config(managed, self.extra)
[docs] class ConsumerSettings(LoomFrozenStruct, frozen=True, kw_only=True): """Typed Kafka consumer settings. Attributes: brokers: Kafka broker addresses. group_id: Consumer group identifier. topics: Topics to subscribe to. auto_offset_reset: Offset reset policy. poll_timeout_ms: Maximum milliseconds to block waiting for a message on each poll call. Higher values reduce CPU usage when the topic is idle; lower values decrease end-to-end latency. Defaults to 100. enable_auto_commit: Whether Kafka should auto-commit offsets. security: Optional security configuration. extra: Optional extra Confluent settings. """ brokers: tuple[str, ...] group_id: str topics: tuple[str, ...] auto_offset_reset: Literal["earliest", "latest"] = "earliest" poll_timeout_ms: int = 100 enable_auto_commit: bool = True security: KafkaSecuritySettings | None = None extra: dict[str, KafkaConfigValue] = msgspec.field(default_factory=dict)
[docs] def to_confluent_config(self) -> dict[str, KafkaConfigValue]: """Compile settings to a Confluent-compatible config mapping. Returns: String-keyed Confluent configuration mapping. Raises: ValueError: If ``extra`` redefines a typed configuration key. """ managed = { "bootstrap.servers": _broker_list(self.brokers), "group.id": self.group_id, "auto.offset.reset": self.auto_offset_reset, "enable.auto.commit": self.enable_auto_commit, **_security_config(self.security), } return _merge_extra_config(managed, self.extra)
[docs] class KafkaSettings(LoomFrozenStruct, frozen=True, kw_only=True): """Typed Kafka settings loaded from a YAML config section. Args: producer: Optional producer settings. consumer: Optional consumer settings. producers: Named producer settings keyed by logical output reference. consumers: Named consumer settings keyed by logical input reference. """ producer: ProducerSettings | None = None consumer: ConsumerSettings | None = None producers: dict[str, ProducerSettings] = msgspec.field(default_factory=dict) consumers: dict[str, ConsumerSettings] = msgspec.field(default_factory=dict)
[docs] def producer_for(self, ref: str | LogicalRef) -> ProducerSettings: """Resolve producer settings by logical reference with default fallback. Args: ref: Logical output reference. Returns: Specific producer settings when present, otherwise the common producer settings. Raises: KeyError: If neither a specific nor common producer is configured. """ resolver = DefaultingRouteResolver( default=self.producer, overrides=self.producers, kind="Kafka producer config", ) return resolver.resolve(ref)
[docs] def consumer_for(self, ref: str | LogicalRef) -> ConsumerSettings: """Resolve consumer settings by logical reference with default fallback. Args: ref: Logical input reference. Returns: Specific consumer settings when present, otherwise the common consumer settings. Raises: KeyError: If neither a specific nor common consumer is configured. """ resolver = DefaultingRouteResolver( default=self.consumer, overrides=self.consumers, kind="Kafka consumer config", ) return resolver.resolve(ref)
[docs] def resolve_producer_topic(ref: str | LogicalRef, settings: ProducerSettings) -> str: """Resolve physical producer topic from settings or logical fallback. Args: ref: Logical output reference used as fallback only when the config does not define ``topic``. settings: Resolved producer settings. Returns: Physical topic name. """ logical_ref = ref.ref if isinstance(ref, LogicalRef) else ref return settings.topic or logical_ref
[docs] def resolve_consumer_topics( ref: str | LogicalRef, settings: ConsumerSettings, ) -> tuple[str, ...]: """Resolve physical consumer topics from settings or logical fallback. Args: ref: Logical input reference used as fallback only when the config does not define ``topics``. settings: Resolved consumer settings. Returns: Physical topic names. """ if settings.topics: return settings.topics logical_ref = ref.ref if isinstance(ref, LogicalRef) else ref return (logical_ref,)
[docs] def load_kafka_settings( *config_files: str, section_name: str = "kafka", resolvers: Sequence[ConfigResolver] = (), ) -> KafkaSettings: """Load Kafka settings from YAML using the shared core config loader. Args: *config_files: One or more local paths or cloud URIs. section_name: Dot-separated config section path. Defaults to ``"kafka"``. resolvers: Optional core config resolvers for custom placeholders. Returns: Validated Kafka settings. Raises: loom.core.config.ConfigError: If files cannot be loaded, the section is missing, or the section fails validation. """ cfg = load_config(*config_files, resolvers=resolvers) return section(cfg, section_name, KafkaSettings)
def _merge_extra_config( config: dict[str, KafkaConfigValue], extra: dict[str, KafkaConfigValue], ) -> dict[str, KafkaConfigValue]: """Merge extra Kafka config without allowing silent key override. Args: config: Base typed configuration. extra: Extra user-provided configuration values. Raises: ValueError: If ``extra`` redefines a typed configuration key. """ duplicate_keys = set(config).intersection(extra) if duplicate_keys: ordered_keys = ", ".join(sorted(duplicate_keys)) raise ValueError(f"extra contains keys already managed by typed settings: {ordered_keys}") return {**config, **extra} def _broker_list(brokers: tuple[str, ...]) -> str: """Return Confluent bootstrap server string.""" return ",".join(brokers) def _security_config(security: KafkaSecuritySettings | None) -> dict[str, KafkaConfigValue]: """Return security config when configured.""" if security is None: return {} return security.to_confluent_config() def _without_none( values: dict[str, KafkaConfigValue | None], ) -> dict[str, KafkaConfigValue]: """Return a copy without unset optional values.""" return {key: value for key, value in values.items() if value is not None}