Source code for loom.streaming.mongo._config
"""Typed MongoDB CDC configuration contracts."""
from __future__ import annotations
import re
from collections.abc import Mapping
from typing import Literal
import msgspec
from loom.core.model import LoomFrozenStruct
from loom.core.routing import DefaultingRouteResolver, LogicalRef
_URI_CREDENTIALS_RE = re.compile(r"://[^@]+@")
def _redact_uri(uri: str) -> str:
return _URI_CREDENTIALS_RE.sub("://***@", uri)
[docs]
class MongoSourceConfig(LoomFrozenStruct, frozen=True, kw_only=True):
"""Connection settings for one MongoDB CDC source."""
uri: str
database: str
watch_options: Mapping[str, object] = msgspec.field(default_factory=dict)
server_api_version: str | None = None
on_oplog_expired: Literal["fail", "restart_from_now"] = "fail"
def __repr__(self) -> str:
return (
f"MongoSourceConfig(uri={_redact_uri(self.uri)!r},"
f" database={self.database!r},"
f" server_api_version={self.server_api_version!r},"
f" on_oplog_expired={self.on_oplog_expired!r})"
)
[docs]
class MongoConfig(LoomFrozenStruct, frozen=True, kw_only=True):
"""Top-level MongoDB settings loaded from the ``mongo`` config section."""
source: MongoSourceConfig | None = None
sources: dict[str, MongoSourceConfig] = msgspec.field(default_factory=dict)
[docs]
def source_for(self, ref: str | LogicalRef) -> MongoSourceConfig:
"""Resolve source settings by logical reference with default fallback."""
resolver = DefaultingRouteResolver(
default=self.source,
overrides=self.sources,
kind="Mongo source config",
)
return resolver.resolve(ref)
__all__ = ["MongoConfig", "MongoSourceConfig"]