Source code for loom.streaming.nodes._shape
"""Logical stream shapes and explicit shape adapters."""
from __future__ import annotations
from enum import StrEnum
from typing import ClassVar
from loom.core.model import LoomFrozenStruct
[docs]
class StreamShape(StrEnum):
"""Logical data shape at a streaming graph edge."""
RECORD = "record"
MANY = "many"
BATCH = "batch"
NONE = "none"
[docs]
class WindowStrategy(StrEnum):
"""Windowing strategy for batch collection.
Determines how records are grouped into batches at runtime.
Values:
COLLECT: Count-and-timeout collect using processing time. Suitable
for testing and low-volume flows. Produces batches of up to
``max_records`` items or after ``timeout_ms`` milliseconds,
whichever comes first.
TUMBLING: Fixed-length event-time tumbling window driven by
``MessageMeta.produced_at_ms``. Not yet implemented.
SESSION: Event-time session window grouped by inactivity gap driven
by ``MessageMeta.produced_at_ms``. Not yet implemented.
Note:
``COLLECT`` is the only strategy available in the current adapter.
``TUMBLING`` and ``SESSION`` are forward-declared and will raise
:class:`loom.streaming.compiler._compiler.CompilationError` until implemented.
"""
COLLECT = "collect"
TUMBLING = "tumbling"
SESSION = "session"
[docs]
class ForEach(LoomFrozenStruct, frozen=True):
"""Explicit shape adapter from ``batch`` to ``record``.
Pattern:
Shape adapter.
Note:
This is a shape-level adapter only. It flattens a batch stream back
to individual records so downstream ``RecordStep`` nodes can consume
them. It carries no business semantics.
"""
router_branch_safe: ClassVar[bool] = True
[docs]
class CollectBatch(LoomFrozenStruct, frozen=True):
"""Explicit shape adapter from ``record`` to ``batch``.
Pattern:
Shape adapter.
Groups individual records into batches before handing them to downstream
batch-aware nodes. Batch is an **optimization grouping**, not a
first-class semantic unit — downstream nodes receive and return individual
records after the batch step executes.
Args:
max_records: Maximum records collected into one batch.
timeout_ms: Maximum wait time before materializing a partial batch.
window: Windowing strategy to use for grouping. Defaults to
``WindowStrategy.COLLECT`` (processing-time count-and-timeout).
event_time: When ``True``, use ``MessageMeta.produced_at_ms`` as the
event clock instead of wall time. Requires a windowing strategy
other than ``COLLECT``.
Raises:
ValueError: If ``max_records`` or ``timeout_ms`` are below one, or
if ``event_time=True`` is combined with ``WindowStrategy.COLLECT``.
Example:
Collect up to 100 records or wait 500 ms::
CollectBatch(max_records=100, timeout_ms=500)
"""
max_records: int
timeout_ms: int
window: WindowStrategy = WindowStrategy.COLLECT
event_time: bool = False
router_branch_safe: ClassVar[bool] = True
def __post_init__(self) -> None:
"""Validate batch collection parameters."""
if self.max_records < 1:
raise ValueError("CollectBatch.max_records must be greater than zero.")
if self.timeout_ms < 1:
raise ValueError("CollectBatch.timeout_ms must be greater than zero.")
if self.event_time and self.window is WindowStrategy.COLLECT:
raise ValueError("event_time=True requires a windowing strategy other than COLLECT.")
[docs]
class Drain(LoomFrozenStruct, frozen=True):
"""Explicit terminal adapter from any shape to ``none``.
Pattern:
Shape adapter.
"""
router_branch_safe: ClassVar[bool] = True
__all__ = ["CollectBatch", "Drain", "ForEach", "StreamShape", "WindowStrategy"]