Source code for loom.rest.fastapi.auto

"""Automatic FastAPI app creation from YAML configuration."""

from __future__ import annotations

import sys
import warnings
from collections.abc import AsyncIterator, Callable
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Any

import msgspec
import prometheus_client
from fastapi import FastAPI
from prometheus_client import CollectorRegistry
from starlette.responses import Response

from loom.core.backend.sqlalchemy import compile_all, get_metadata, reset_registry
from loom.core.bootstrap import KernelRuntime, create_kernel
from loom.core.config import ConfigContext, ConfigKey
from loom.core.config.errors import ConfigError
from loom.core.di.container import LoomContainer
from loom.core.di.scope import Scope
from loom.core.discovery import (
    InterfacesDiscoveryEngine,
    ManifestDiscoveryEngine,
    ModulesDiscoveryEngine,
)
from loom.core.discovery.base import DiscoveryResult
from loom.core.job.service import InlineJobService, JobService
from loom.core.model import BaseModel
from loom.core.observability.config import ObservabilityConfig, PrometheusObservabilityConfig
from loom.core.observability.runtime import ObservabilityRuntime
from loom.core.repository.sqlalchemy import build_sqlalchemy_repository_registration_module
from loom.core.repository.sqlalchemy.session_manager import SessionManager
from loom.core.repository.sqlalchemy.uow import SQLAlchemyUnitOfWorkFactory
from loom.prometheus import PrometheusMetricsAdapter
from loom.prometheus.middleware import PrometheusMiddleware
from loom.rest.fastapi.app import create_fastapi_app
from loom.rest.middleware import TraceIdMiddleware


class _DiscoveryInterfaces(msgspec.Struct, kw_only=True):
    modules: list[str] = msgspec.field(default_factory=list)
    warn_recommended: bool = True


class _DiscoveryModules(msgspec.Struct, kw_only=True):
    include: list[str] = msgspec.field(default_factory=list)


class _DiscoveryManifest(msgspec.Struct, kw_only=True):
    module: str = ""


class _DiscoveryConfig(msgspec.Struct, kw_only=True):
    mode: str = "interfaces"
    interfaces: _DiscoveryInterfaces = msgspec.field(default_factory=_DiscoveryInterfaces)
    modules: _DiscoveryModules = msgspec.field(default_factory=_DiscoveryModules)
    manifest: _DiscoveryManifest = msgspec.field(default_factory=_DiscoveryManifest)


class _RestConfig(msgspec.Struct, kw_only=True):
    backend: str = "fastapi"
    title: str = "Loom API"
    version: str = "0.1.0"
    docs_url: str | None = "/docs"
    redoc_url: str | None = "/redoc"


class _AppConfig(msgspec.Struct, kw_only=True):
    name: str
    code_path: str = "src"
    discovery: _DiscoveryConfig = msgspec.field(default_factory=_DiscoveryConfig)
    rest: _RestConfig = msgspec.field(default_factory=_RestConfig)


class _DatabaseConfig(msgspec.Struct, kw_only=True):
    url: str
    echo: bool | None = None
    pool_pre_ping: bool = True


_DISCOVERY_ENGINES: dict[str, Callable[[_DiscoveryConfig], DiscoveryResult]] = {
    "interfaces": lambda cfg: InterfacesDiscoveryEngine(
        cfg.interfaces.modules,
        warn_recommended=cfg.interfaces.warn_recommended,
    ).discover(),
    "modules": lambda cfg: ModulesDiscoveryEngine(cfg.modules.include).discover(),
    "manifest": lambda cfg: ManifestDiscoveryEngine(cfg.manifest.module).discover(),
}


def _ensure_code_path(code_path: Path) -> None:
    path_str = str(code_path.resolve())
    if path_str not in sys.path:
        sys.path.insert(0, path_str)


def _register_repositories(
    session_manager: SessionManager,
    models: tuple[type[BaseModel], ...],
) -> Callable[[LoomContainer], None]:
    return build_sqlalchemy_repository_registration_module(session_manager, models)


def _build_discovery_result(discovery_cfg: _DiscoveryConfig) -> DiscoveryResult:
    engine = _DISCOVERY_ENGINES.get(discovery_cfg.mode)
    if engine is None:
        raise ValueError(f"Unsupported discovery mode: {discovery_cfg.mode!r}")
    return engine(discovery_cfg)


def _build_celery_service(
    ctx: ConfigContext,
    result: KernelRuntime,
    observability_runtime: ObservabilityRuntime | None,
) -> Any | None:
    """Return a ``CeleryJobService`` if the celery extra is installed and configured.

    Returns ``None`` when the ``loom[celery]`` extra is absent or the
    ``celery`` config section is missing / malformed, so the caller can
    fall back to :func:`_build_inline_service`.

    Args:
        ctx: Resolved configuration context.
        result: Kernel runtime carrying container, factory and executor.

    Returns:
        ``CeleryJobService`` instance, or ``None``.
    """
    try:
        from loom.celery.config import (  # type: ignore[import-untyped,unused-ignore]
            CeleryConfig as _CC,
        )
        from loom.celery.config import (
            create_celery_app,
        )
        from loom.celery.service import (
            CeleryJobService,  # type: ignore[import-untyped,unused-ignore]
        )

        celery_cfg = ctx.section(ConfigKey.CELERY, _CC)
        return CeleryJobService(
            create_celery_app(celery_cfg),
            metrics=result.metrics,
            factory=result.factory,
            executor=result.executor,
            observability_runtime=observability_runtime,
        )
    except ImportError:
        return None
    except ConfigError:
        warnings.warn(
            "Celery config section is missing or malformed — falling back to InlineJobService. "
            "Add a 'celery' section to your config or install loom[celery] to suppress this.",
            stacklevel=3,
        )
        return None


def _build_inline_service(result: KernelRuntime) -> InlineJobService:
    """Return an ``InlineJobService`` backed by the kernel's factory and executor.

    Args:
        result: Kernel runtime carrying factory and executor.

    Returns:
        ``InlineJobService`` instance.
    """
    return InlineJobService(result.factory, result.executor)


def _configure_job_service(
    ctx: ConfigContext,
    result: KernelRuntime,
    observability_runtime: ObservabilityRuntime | None,
) -> None:
    """Register a ``JobService`` implementation in the container.

    Registers :class:`~loom.celery.service.CeleryJobService` when a
    ``celery`` config section is present and the ``loom[celery]`` extra is
    installed.  Falls back to
    :class:`~loom.core.job.service.InlineJobService` otherwise — enabling
    local development and tests without a broker.

    The registration uses ``APPLICATION`` scope so the service is created
    once and shared across all requests.

    Args:
        ctx: Resolved configuration context.
        result: Kernel runtime carrying container, factory and executor.
    """
    svc = _build_celery_service(ctx, result, observability_runtime) or _build_inline_service(result)
    result.container.register(JobService, lambda: svc, scope=Scope.APPLICATION)


def _load_observability_config(ctx: ConfigContext) -> ObservabilityConfig:
    """Load top-level observability config or fall back to defaults."""
    try:
        return ctx.section(ConfigKey.OBSERVABILITY, ObservabilityConfig)
    except ConfigError:
        return ObservabilityConfig()


def _build_bootstrap(
    app_cfg: _AppConfig,
    db_cfg: _DatabaseConfig,
    echo: bool,
    metrics: Any | None = None,
) -> tuple[KernelRuntime, SessionManager, DiscoveryResult]:
    discovered = _discover_components(app_cfg)
    session_manager = _build_sqlalchemy_session_manager(db_cfg, echo)
    _compile_discovered_models(discovered)
    result = _build_kernel_runtime(app_cfg, discovered, session_manager, metrics=metrics)
    return result, session_manager, discovered


def _discover_components(app_cfg: _AppConfig) -> DiscoveryResult:
    discovered = _build_discovery_result(app_cfg.discovery)
    if not discovered.use_cases:
        raise RuntimeError("No UseCase classes discovered.")
    if not discovered.interfaces:
        raise RuntimeError("No RestInterface classes discovered.")
    if not discovered.models:
        raise RuntimeError("No BaseModel classes discovered.")
    return discovered


def _compile_discovered_models(discovered: DiscoveryResult) -> None:
    reset_registry()
    compile_all(*discovered.models)


def _build_sqlalchemy_session_manager(
    db_cfg: _DatabaseConfig,
    echo: bool,
) -> SessionManager:
    return SessionManager(
        db_cfg.url,
        echo=echo,
        pool_pre_ping=db_cfg.pool_pre_ping,
        pool_size=None,
        max_overflow=None,
        pool_timeout=None,
        pool_recycle=None,
        connect_args={},
    )


def _build_kernel_runtime(
    app_cfg: _AppConfig,
    discovered: DiscoveryResult,
    session_manager: SessionManager,
    metrics: Any | None = None,
) -> KernelRuntime:
    uow_factory = SQLAlchemyUnitOfWorkFactory(session_manager)
    return create_kernel(
        config=app_cfg,
        use_cases=discovered.use_cases,
        modules=[_register_repositories(session_manager, discovered.models)],
        uow_factory=uow_factory,
        metrics=metrics,
    )


def _build_metrics_adapter(
    cfg: PrometheusObservabilityConfig,
    registry: CollectorRegistry | None,
) -> Any | None:
    """Return a ``PrometheusMetricsAdapter`` when metrics are enabled, else ``None``.

    Args:
        cfg: Metrics feature config.
        registry: Optional Prometheus registry override.

    Returns:
        ``PrometheusMetricsAdapter`` or ``None``.
    """
    if not cfg.enabled:
        return None
    return PrometheusMetricsAdapter(registry=registry)


def _metrics_path(cfg: PrometheusObservabilityConfig) -> str:
    """Return the REST metrics path declared in the Prometheus config."""
    return cfg.config.path if cfg.config is not None else "/metrics"


def _mount_optional_middlewares(
    app: FastAPI,
    metrics_cfg: PrometheusObservabilityConfig,
    registry: CollectorRegistry | None,
) -> None:
    """Mount request tracing and metrics middlewares.

    Args:
        app: FastAPI application to mutate.
        metrics_cfg: Metrics feature config.
        registry: Optional Prometheus registry override.
    """
    app.add_middleware(TraceIdMiddleware)
    if metrics_cfg.enabled:
        _mount_metrics(app, metrics_cfg, registry)


def _mount_metrics(
    app: FastAPI,
    cfg: PrometheusObservabilityConfig,
    registry: CollectorRegistry | None,
) -> None:
    """Add Prometheus middleware and scrape endpoint to *app*.

    Args:
        app: FastAPI application to mutate.
        cfg: Metrics feature config.
        registry: Optional Prometheus registry override.
    """
    path = _metrics_path(cfg)
    if "{" in path:
        raise ValueError(f"metrics.path must not contain path parameters, got: {path!r}")
    app.add_middleware(PrometheusMiddleware, registry=registry)
    scrape_registry = registry or prometheus_client.REGISTRY

    def _scrape() -> Response:
        return Response(
            content=prometheus_client.generate_latest(scrape_registry),
            media_type=prometheus_client.CONTENT_TYPE_LATEST,
        )

    def _scrape_trailing_slash() -> Response:
        # Return 404 for trailing-slash variant to avoid ambiguous scrape targets.
        return Response(status_code=404)

    app.add_api_route(path, _scrape, methods=["GET"], include_in_schema=False)
    app.add_api_route(f"{path}/", _scrape_trailing_slash, methods=["GET"], include_in_schema=False)


[docs] def create_app( *config_paths: str, code_path: str | None = None, metrics_registry: CollectorRegistry | None = None, ) -> FastAPI: """Create a FastAPI application from one or more YAML config files. Config files are merged left-to-right — later files override earlier ones. Each file may also declare a top-level ``includes`` list to pull in additional base files before its own values (resolved by :meth:`loom.core.config.ConfigContext.from_yaml`). ``TraceIdMiddleware`` is mounted automatically. Structured logging and OTEL come from the top-level ``observability:`` section. Prometheus middleware is mounted when ``observability.prometheus.enabled`` is ``true``. Args: *config_paths: One or more paths to YAML configuration files. code_path: Optional override for ``app.code_path``. Resolved relative to the first config file when not absolute. metrics_registry: Optional Prometheus ``CollectorRegistry`` used for ``PrometheusMiddleware`` and the scrape endpoint. Defaults to the global registry. Pass a fresh ``CollectorRegistry()`` in tests to avoid ``ValueError: Duplicated timeseries`` when multiple apps with ``observability.prometheus.enabled: true`` are created in the same process. Returns: Configured :class:`fastapi.FastAPI` application, ready to serve. Example — single config:: app = create_app("config/app.yaml") Example — base + environment override:: app = create_app("config/base.yaml", "config/production.yaml") Example — single file using inline includes:: # config/app.yaml # includes: # - base.yaml # - secrets.yaml app = create_app("config/app.yaml") """ if not config_paths: raise ConfigError("create_app requires at least one config file path.") ctx = ConfigContext.from_yaml(*config_paths) app_cfg = ctx.section(ConfigKey.APP, _AppConfig) db_cfg = ctx.section(ConfigKey.DATABASE, _DatabaseConfig) observability_cfg = _load_observability_config(ctx) observability_runtime = ObservabilityRuntime.from_config(observability_cfg) metrics_cfg = observability_cfg.prometheus config_file = Path(config_paths[0]).resolve() effective_code_path = Path(code_path) if code_path is not None else Path(app_cfg.code_path) if not effective_code_path.is_absolute(): effective_code_path = (config_file.parent / effective_code_path).resolve() _ensure_code_path(effective_code_path) effective_echo = db_cfg.echo if db_cfg.echo is not None else False metrics_adapter = _build_metrics_adapter(metrics_cfg, metrics_registry) result, session_manager, discovered = _build_bootstrap( app_cfg, db_cfg, effective_echo, metrics=metrics_adapter, ) _configure_job_service(ctx, result, observability_runtime) @asynccontextmanager async def lifespan(_: FastAPI) -> AsyncIterator[None]: async with session_manager.engine.begin() as connection: await connection.run_sync(get_metadata().create_all) try: yield finally: await session_manager.dispose() reset_registry() app = create_fastapi_app( result, interfaces=tuple(type_i for type_i in discovered.interfaces), observability_runtime=observability_runtime, title=app_cfg.rest.title, version=app_cfg.rest.version, docs_url=app_cfg.rest.docs_url, redoc_url=app_cfg.rest.redoc_url, lifespan=lifespan, ) _mount_optional_middlewares(app, metrics_cfg, metrics_registry) return app