Source code for loom.streaming.compiler._compiler

"""Streaming flow compiler: validates and compiles StreamFlow into CompiledPlan."""

from __future__ import annotations

from collections.abc import Mapping
from typing import Any

from loom.core.config import ConfigContext
from loom.streaming.compiler._bindings import resolve_flow_bindings
from loom.streaming.compiler._plan import CompilationError, CompiledPlan
from loom.streaming.compiler.phases.build_plan import build_plan
from loom.streaming.compiler.phases.validate import (
    validate_kafka,
    validate_mongo,
    validate_outputs,
    validate_resources,
    validate_shapes,
    validate_storage_sinks,
)
from loom.streaming.graph._flow import StreamFlow


[docs] def compile_flow( flow: StreamFlow[Any, Any], *, config: ConfigContext | Mapping[str, Any], ) -> CompiledPlan: """Compile a flow into an immutable plan. Args: flow: User-declared streaming flow. config: Canonical runtime config context used for binding resolution and Kafka settings extraction. Returns: Immutable :class:`CompiledPlan` ready for adapter wiring. Raises: CompilationError: If binding resolution or any validation phase fails. """ return _Compiler().compile(flow, _ensure_config_context(config))
class _Compiler: """Validates a StreamFlow and produces a CompiledPlan. Each validator is a pure function that returns a list of error strings. """ def compile(self, flow: StreamFlow[Any, Any], ctx: ConfigContext) -> CompiledPlan: """Run all validation phases then build the compiled plan.""" errors: list[str] = [] resolved_flow, binding_errors = resolve_flow_bindings(flow, ctx) errors.extend(binding_errors) if errors: raise CompilationError(errors) errors.extend(validate_kafka(resolved_flow, ctx)) errors.extend(validate_mongo(resolved_flow, ctx)) errors.extend(validate_storage_sinks(resolved_flow, ctx)) errors.extend(validate_resources(resolved_flow)) errors.extend(validate_shapes(resolved_flow)) errors.extend(validate_outputs(resolved_flow)) if errors: raise CompilationError(errors) return build_plan(resolved_flow, ctx) def _ensure_config_context( source: ConfigContext | Mapping[str, Any], ) -> ConfigContext: """Normalize supported config inputs to a :class:`ConfigContext`.""" if isinstance(source, ConfigContext): return source return ConfigContext.from_dict(source)