Source code for loom.streaming.nodes._mongo
"""MongoDB CDC input boundary."""
from __future__ import annotations
from collections.abc import Mapping
from typing import Generic
import msgspec
from typing_extensions import TypeVar
from loom.core.model import LoomFrozenStruct, LoomStruct
from loom.core.routing import LogicalRef, as_logical_ref
from loom.streaming.mongo._event import MongoCDCEvent
from loom.streaming.nodes._shape import StreamShape
MongoCDCPayloadT = TypeVar(
"MongoCDCPayloadT",
bound=LoomStruct | LoomFrozenStruct,
default=MongoCDCEvent,
covariant=True,
)
[docs]
class FromMongoCDC(LoomFrozenStruct, Generic[MongoCDCPayloadT], frozen=True):
"""Declare a MongoDB CDC stream as a streaming input boundary.
The type parameter ``MongoCDCPayloadT`` identifies the payload type
produced by this source. It defaults to ``MongoCDCEvent`` so callers
that do not need to override the payload type can omit it.
Args:
name: Logical input reference used to resolve Mongo source config.
collections: Collection names to watch. An empty tuple means a
database-level watch.
watch_options: Driver watch kwargs resolved later by compiler/runtime.
shape: Declared source shape.
"""
name: str
collections: tuple[str, ...] = ()
watch_options: Mapping[str, object] = msgspec.field(default_factory=dict)
shape: StreamShape = StreamShape.RECORD
@property
def logical_ref(self) -> LogicalRef:
"""Return the logical input reference."""
return as_logical_ref(self.name)
__all__ = ["FromMongoCDC"]