Source code for loom.streaming.nodes._decompose

"""Explode DSL node for typed multi-target event transformation."""

from __future__ import annotations

from dataclasses import dataclass
from typing import Any, ClassVar, Generic, Protocol, TypeVar

from loom.streaming.core._message import StreamPayload

PayloadT_contra = TypeVar("PayloadT_contra", bound=StreamPayload, contravariant=True)
InT = TypeVar("InT", bound=StreamPayload)


[docs] class PayloadExpander(Protocol[PayloadT_contra]): """Expand one typed stream event into typed sub-event collections. Each key in the returned dict is the concrete type that the downstream Router uses to dispatch to the correct branch — type-keyed, not string-keyed. The expand() method must be pure: no I/O, no side effects, no knowledge of downstream sinks or storage backends. Declare ``outputs`` to allow the compiler to validate that every produced type has a matching Router branch. Example:: class StoreEventExpander(PayloadExpander[StoreEvent]): outputs: ClassVar[tuple[type, ...]] = (StoreRow, LanguageRow) @classmethod def expand(cls, event: StoreEvent) -> dict[type, list[Any]]: return { StoreRow: [StoreRow(id=event.store_id, name=event.name)], LanguageRow: [ LanguageRow(code=code) for code in event.language_codes ], } Args: PayloadT_contra: Contravariant type of the incoming stream event. """ outputs: ClassVar[tuple[type, ...]]
[docs] @classmethod def expand(cls, event: PayloadT_contra) -> dict[type, list[Any]]: """Expand one event into typed sub-event collections. Args: event: Incoming stream event to expand. Returns: Mapping from output type to list of event instances of that type. Every type in ``outputs`` must appear as a key. """ ...
[docs] @dataclass(frozen=True) class Explode(Generic[InT]): """Transformation step that fans one typed event into N typed sub-events. Explode sits upstream of a Router and has no knowledge of downstream sinks. The Router dispatches each emitted type to the matching branch — the same mechanism used for any multi-type stream. This keeps expanders and sinks fully decoupled. Can appear in Router branches when nested expansion is needed (e.g. OrderEvent → OrderLineEvent → OrderLineRow). Args: exploder: Class implementing PayloadExpander[InT]. Example:: Process( Explode(StoreEventExpander), Router({ StoreRow: Process(IntoTopic("stores.rows", payload=StoreRow)), LanguageRow: Process(IntoTopic("stores.languages", payload=LanguageRow)), }), ) """ exploder: type[PayloadExpander[InT]] router_branch_safe: ClassVar[bool] = True
__all__ = ["Explode", "PayloadExpander"]