loom.streaming.testing

Streaming testing helpers built on top of Bytewax testing sinks and sources.

Public API

  • StreamingTestRunner — test-oriented runner with injectable input, captured output, and explicit error branch capture.

Typical usage:

runner = StreamingTestRunner.from_flow(flow, config=cfg)
runner.with_payloads([OrderPlaced(order_id="o-1")])
runner.capture_errors(ErrorKind.WIRE)
runner.run()
assert len(runner.output) == 1

Functions

_ensure_config_context(source)

Normalize supported config inputs to a ConfigContext.

_source_topic_or_none(plan)

Return the first source topic when the compiled source exposes one.

_test_message(topic, idx, payload)

Classes

StreamingTestRunner(plan[, ...])

Run a streaming flow with test doubles for input and output.

class loom.streaming.testing.StreamingTestRunner(plan, observability_runtime=None)[source]

Bases: object

Run a streaming flow with test doubles for input and output.

The runner mirrors the production runner’s dataflow preparation and swaps only the source and sinks for Bytewax testing primitives. Use with_payloads() for ergonomic flow-author tests and with_messages() when metadata must be controlled explicitly.

Parameters:
  • plan (CompiledPlan) – Compiled plan produced by the compiler.

  • observer – Optional flow observer.

  • observability_runtime (ObservabilityRuntime | None)

classmethod from_flow(flow, *, config, observability_runtime=None)[source]

Compile a flow and build a test runner from resolved config.

Parameters:
Return type:

StreamingTestRunner

classmethod from_yaml(flow, path, *, observability_runtime=None)[source]

Load YAML config, compile the flow, and build a test runner.

Parameters:
Return type:

StreamingTestRunner

classmethod from_dict(flow, config, *, observability_runtime=None)[source]

Build a test runner from a plain Python config mapping.

Parameters:
Return type:

StreamingTestRunner

with_payloads(items)[source]

Replace input with payload-derived test messages.

Payloads are wrapped into loom.streaming.Message values using deterministic test metadata:

  • message_id: test-<index>

  • topic: first source topic declared by the flow

Parameters:

items (list[Any])

Return type:

StreamingTestRunner

with_messages(items)[source]

Replace input with fully formed runtime messages or raw test records.

Parameters:

items (list[Any])

Return type:

StreamingTestRunner

capture_errors(*kinds)[source]

Enable capture for explicit error branches.

Parameters:

kinds (ErrorKind)

Return type:

StreamingTestRunner

reset()[source]

Clear captured input, output, and error buffers.

Return type:

StreamingTestRunner

property output: list[Any]

Captured items written to the main testing sink.

property errors: dict[ErrorKind, list[Any]]

Captured items written to configured error sinks.

run()[source]

Execute the compiled dataflow with testing source and sinks.

Return type:

None