Streaming Quickstart

loom.streaming lets you declare typed streaming flows with explicit sources, steps, batching, branching, and transport wiring.

The companion demo repository for this subsystem is dummy-loom-streaming.

Install

Choose the streaming runtime and Kafka transport:

pip install "loom-kernel[streaming,kafka]"

Minimal flow

from loom.streaming import (
    CollectBatch,
    FromTopic,
    IntoTopic,
    Process,
    StreamFlow,
    WithAsync,
    payload,
)

from app.streaming.tasks import FetchRequestTask
from app.streaming.types import ScrapeRequest, ScrapeResponse


async_scrape_flow = StreamFlow(
    name="async_smoke_flow",
    source=FromTopic[ScrapeRequest](
        name="scrape.requests",
        payload=ScrapeRequest,
    ),
    process=Process(
        CollectBatch(max_records=50, timeout_ms=2000),
        WithAsync(
            process=Process(
                FetchRequestTask,
                IntoTopic[ScrapeResponse](
                    name="scrape.responses",
                    payload=ScrapeResponse,
                ),
            ),
            max_concurrency=50,
        ),
    ),
)

Run it

Use the Bytewax runner to compile the flow, resolve YAML bindings, and execute the graph:

from loom.streaming.bytewax import StreamingRunner

runner = StreamingRunner.from_yaml(async_scrape_flow, "config/streaming.yaml")
runner.run()

Public imports

Prefer the public surface:

from loom.streaming import (
    Broadcast,
    CollectBatch,
    ContextFactory,
    Fork,
    FromTopic,
    IntoTopic,
    Message,
    Process,
    RecordStep,
    Router,
    StreamFlow,
    With,
    WithAsync,
    msg,
    payload,
)

Use loom.streaming.nodes, loom.streaming.kafka, and loom.streaming.bytewax only when you need a narrower or runtime-specific surface.

For a runnable end-to-end example, see the companion repository: dummy-loom-streaming.