Quickstart¶
loom-kernel lets you define business behavior in typed use cases and expose it
through REST interfaces — with full auto-CRUD, background jobs, and Celery workers.
Install¶
pip install "loom-kernel[rest,sqlalchemy,cache,celery]"
60-second demo: full CRUD from a model¶
Define a model and set auto = True on a RestInterface. That is the entire
application layer:
from loom.core.model import ColumnField, TimestampedModel
from loom.rest.model import PaginationMode, RestInterface
class Product(TimestampedModel):
__tablename__ = "products"
id: int = ColumnField(primary_key=True, autoincrement=True)
sku: str = ColumnField(length=64, unique=True, index=True)
name: str = ColumnField(length=150)
price_cents: int = ColumnField()
stock: int = ColumnField()
class ProductInterface(RestInterface[Product]):
prefix = "/products"
tags = ("Products",)
auto = True
pagination_mode = PaginationMode.CURSOR
Wire it up:
# config/api.yaml
app:
name: my_store
code_path: src
discovery:
mode: modules
modules:
include:
- app.product.model
- app.product.interface
rest:
backend: fastapi
title: My Store API
version: 0.1.0
database:
url: ${oc.env:DATABASE_URL,sqlite+aiosqlite:///store.db}
# main.py
from loom.rest.fastapi.auto import create_app
app = create_app("config/api.yaml")
Five endpoints (GET /, GET /:id, POST /, PATCH /:id, DELETE /:id) are
live with zero hand-written use-case code.
Add business logic¶
Use cases declare inputs and invariants declaratively. The engine resolves them
before execute() runs.
import re
from loom.core.command import Command, Patch
from loom.core.errors import NotFound
from loom.core.use_case import Exists, F, Input, LoadById, OnMissing, Rule
from loom.core.use_case.use_case import UseCase
_EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
class CreateUser(Command, frozen=True):
full_name: str
email: str
class UpdateUser(Command, frozen=True):
full_name: Patch[str] = None
email: Patch[str] = None
def _name_must_not_be_blank(full_name: str) -> str | None:
return None if full_name.strip() else "full_name must not be blank"
def _email_must_be_valid(email: str) -> str | None:
return None if _EMAIL_RE.fullmatch(email) else "email must be valid"
def _email_is_taken(cmd: CreateUser, fields_set: frozenset[str], email_exists: bool) -> bool:
return email_exists
class CreateUserUseCase(UseCase[User, User]):
rules = [
Rule.check(F(CreateUser).full_name, via=_name_must_not_be_blank),
Rule.check(F(CreateUser).email, via=_email_must_be_valid),
Rule.forbid(_email_is_taken, message="email already exists").from_params("email_exists"),
]
async def execute(
self,
cmd: CreateUser = Input(),
email_exists: bool = Exists(User, from_command="email", against="email"),
) -> User:
return await self.main_repo.create(cmd)
class UpdateUserUseCase(UseCase[User, User | None]):
rules = [Rule.check(F(UpdateUser).full_name, via=_name_must_not_be_blank).when_present(F(UpdateUser).full_name)]
async def execute(
self,
user_id: int,
cmd: UpdateUser = Input(),
current_user: User = LoadById(User, by="user_id"),
) -> User | None:
return await self.main_repo.update(user_id, cmd)
Exists checks a DB condition before execute() runs — no boilerplate in the body.
LoadById fetches an entity by a path/command parameter, available in rules and the body.
Patch[T] marks a field as optional in partial updates; .when_present(...) gates rules on whether the field was sent.
Scope resources under a parent¶
Use from_param to guard nested routes (e.g. /users/{user_id}/addresses/{address_id}):
from loom.core.use_case import Exists, Input, OnMissing
class CreateAddressUseCase(UseCase[Address, Address]):
async def execute(
self,
user_id: int,
cmd: CreateUserAddress = Input(),
_user_exists: bool = Exists(User, from_param="user_id", against="id", on_missing=OnMissing.RAISE),
) -> Address:
return await self.main_repo.create(CreateAddressRecord(user_id=user_id, **cmd.__dict__))
OnMissing.RAISE returns a structured 404 automatically — no if in the body.
Structured queries¶
Build explicit queries without raw SQL:
from loom.core.repository.abc.query import (
FilterGroup, FilterOp, FilterSpec, PageResult, PaginationMode, QuerySpec, SortSpec,
)
class ListLowStockProductsUseCase(UseCase[Product, PageResult[Product]]):
async def execute(self, profile: str = "default") -> PageResult[Product]:
query = QuerySpec(
filters=FilterGroup(filters=(FilterSpec(field="stock", op=FilterOp.LTE, value=5),)),
sort=(SortSpec(field="stock", direction="ASC"),),
pagination=PaginationMode.OFFSET,
limit=20,
page=1,
)
result = await self.main_repo.list_with_query(query, profile=profile)
if not isinstance(result, PageResult):
raise RuntimeError("expected offset result")
return result
Background jobs¶
Jobs are use-case-like executors that run in a Celery queue. LoadById works the same way:
from loom.core.job.job import Job
from loom.core.use_case import Input, LoadById
class SendRestockEmailJob(Job[bool]):
__queue__ = "notifications"
async def execute(
self,
product_id: int,
cmd: SendRestockEmailCommand = Input(),
product: Product = LoadById(Product, by="product_id"),
) -> bool:
if product.stock > 0:
return False
# send email to cmd.recipient_email …
return True
Dispatch jobs from use cases + callbacks¶
from loom.core.job.service import JobService
class DispatchRestockEmailUseCase(UseCase[Product, DispatchRestockEmailResponse]):
def __init__(self, job_service: JobService) -> None:
self._jobs = job_service
async def execute(self, product_id: str, cmd: DispatchRestockEmailCommand = Input()) -> DispatchRestockEmailResponse:
handle = self._jobs.dispatch(
SendRestockEmailJob,
params={"product_id": int(product_id)},
payload={"product_id": int(product_id), "recipient_email": cmd.recipient_email},
on_success=RestockEmailSuccessCallback,
on_failure=RestockEmailFailureCallback,
)
return DispatchRestockEmailResponse(job_id=handle.job_id, queue=handle.queue)
Callbacks are resolved by the DI container and receive the job result + context:
class RestockEmailSuccessCallback:
def __init__(self, app: ApplicationInvoker) -> None:
self._app = app
async def on_success(self, job_id: str, result: Any, **context: Any) -> None:
if not result:
return
entity = self._app.entity(Product)
product = await entity.get(params={"id": context["product_id"]})
if product:
await entity.update(params={"id": product.id}, payload={"category": f"{product.category}-notified"})
Chain use cases (workflow pattern)¶
ApplicationInvoker lets a use case call another use case by type — no tight coupling:
from loom.core.use_case.invoker import ApplicationInvoker
class RestockWorkflowUseCase(UseCase[Product, RestockWorkflowResponse]):
def __init__(self, app: ApplicationInvoker, job_service: JobService) -> None:
self._app = app
self._jobs = job_service
async def execute(self, product_id: str, cmd: DispatchRestockEmailCommand = Input()) -> RestockWorkflowResponse:
summary = await self._app.invoke(BuildProductSummaryUseCase, params={"product_id": int(product_id)})
handle = self._jobs.dispatch(SendRestockEmailJob, params={"product_id": int(product_id)}, payload={...})
return RestockWorkflowResponse(summary=summary.summary, restock_job_id=handle.job_id, queue=handle.queue)
Declare REST interfaces¶
from loom.rest.autocrud import build_auto_routes
from loom.rest.model import PaginationMode, RestInterface, RestRoute
class ProductRestInterface(RestInterface[Product]):
prefix = "/products"
tags = ("Products",)
pagination_mode = PaginationMode.CURSOR
routes = (
RestRoute(use_case=ListLowStockProductsUseCase, method="GET", path="/low-stock",
summary="List low stock products"),
RestRoute(use_case=DispatchRestockEmailUseCase, method="POST",
path="/{product_id}/jobs/restock-email", status_code=202,
summary="Dispatch restock email"),
RestRoute(use_case=RestockWorkflowUseCase, method="POST",
path="/{product_id}/workflows/restock", status_code=202,
summary="Run restock workflow"),
*build_auto_routes(Product, ()), # adds GET, POST, PATCH, DELETE automatically
)
Nested resource interfaces work the same way — routes mirror the URL hierarchy:
class AddressRestInterface(RestInterface[Address]):
prefix = "/users"
tags = ("UserAddresses",)
routes = (
RestRoute(use_case=CreateAddressUseCase, method="POST", path="/{user_id}/addresses/", status_code=201),
RestRoute(use_case=ListAddressesUseCase, method="GET", path="/{user_id}/addresses/"),
RestRoute(use_case=GetAddressUseCase, method="GET", path="/{user_id}/addresses/{address_id}"),
RestRoute(use_case=UpdateAddressUseCase, method="PATCH", path="/{user_id}/addresses/{address_id}"),
RestRoute(use_case=DeleteAddressUseCase, method="DELETE", path="/{user_id}/addresses/{address_id}"),
)
Bootstrap with YAML¶
The create_app() factory wires everything — DB, cache, DI, routes — from a YAML config:
# config/api.yaml
app:
name: my_store
code_path: src
discovery:
mode: modules
modules:
include:
- app.user.model
- app.user.interface
- app.product.model
- app.product.interface
rest:
backend: fastapi
title: My Store API
version: 0.1.0
database:
url: ${oc.env:DATABASE_URL,sqlite+aiosqlite:///store.db}
metrics:
enabled: true
path: /metrics
trace:
enabled: true
# main.py — 3 lines
from loom.rest.fastapi.auto import create_app
app = create_app("config/api.yaml")
For larger projects, use mode: manifest and a manifest module:
# app/manifest.py
from app.user.model import User
from app.user.interface import UserRestInterface
MODELS = [User, ...]
INTERFACES = [UserRestInterface, ...]
discovery:
mode: manifest
manifest:
module: app.manifest
Rules + Computes (advanced)¶
For compute-heavy write flows, declare field derivations and run them before rules:
from loom.core.use_case import Compute, F
def _normalize_email(email: str) -> str:
return email.strip().lower()
def _compute_subtotal(unit_price: float, quantity: int) -> float:
return unit_price * quantity
def _compute_tax(subtotal: float, tax_rate: float) -> float:
return subtotal * tax_rate
def _unit_price_invalid(unit_price: float) -> bool:
return unit_price <= 0
def _country_unsupported(country: str) -> bool:
return country not in TAX_RATES
class PricingPreviewUseCase(UseCase[Record, PricingPreviewResponse]):
computes = (
Compute.set(F(PricingCommand).normalized_email).from_command(
F(PricingCommand).email, via=_normalize_email,
),
Compute.set(F(PricingCommand).subtotal).from_command(
F(PricingCommand).unit_price, F(PricingCommand).quantity,
via=_compute_subtotal,
),
Compute.set(F(PricingCommand).tax_amount).from_command(
F(PricingCommand).subtotal, F(PricingCommand).tax_rate,
via=_compute_tax,
),
)
rules = (
Rule.check(F(PricingCommand).unit_price, via=_unit_price_invalid, message="unit_price must be > 0"),
Rule.check(F(PricingCommand).country, via=_country_unsupported, message="Unsupported country"),
)
async def execute(self, record_id: int, cmd: PricingCommand = Input()) -> PricingPreviewResponse:
...
Computes run in declaration order — later computes can reference fields set by earlier ones.
Next steps¶
Auto-CRUD guide — full options reference for
auto = Trueandbuild_auto_routes()Use-case DSL — rules, computes, predicates
Celery worker — job definitions, dispatch, callbacks, worker bootstrap
Examples repo — full walkthrough of the
dummy-loomcompanion app