Celery worker¶
loom-kernel bootstraps Celery workers from the same config and discovery system used
by the REST API. Jobs are first-class citizens — they support Input(), LoadById(),
Rule, and full DI injection just like use cases.
Install¶
pip install "loom-kernel[celery]"
Core concepts¶
Concept |
Purpose |
|---|---|
|
Background unit of work — runs asynchronously on a worker |
|
Enqueue a job after the current UoW commits (fire-and-forget) |
|
Execute a job immediately in the calling process |
|
Hook invoked by the broker on task success or failure |
|
Single entry point to start a Celery worker |
|
YAML-driven broker and worker settings |
|
Per-job routing and retry overrides from YAML |
Define a job¶
A Job declares its queue via __queue__ and implements execute().
LoadById loads an entity from the repository before execute() runs — same
pattern as use cases:
# app/product/jobs.py
from loom.core.command import Command
from loom.core.job.job import Job
from loom.core.use_case import Input, LoadById
from app.product.model import Product
class SendRestockEmailJobCommand(Command, frozen=True):
product_id: int
recipient_email: str
class SendRestockEmailJob(Job[bool]):
"""Send a restock notification email. Returns True when email was sent."""
__queue__ = "notifications"
__retries__ = 2
__timeout__ = 30 # soft time limit in seconds
async def execute(
self,
product_id: int,
cmd: SendRestockEmailJobCommand = Input(),
product: Product = LoadById(Product, by="product_id"),
) -> bool:
if product.stock > 0:
return False
# send email to cmd.recipient_email …
return True
Job ClassVars reference¶
ClassVar |
Type |
Default |
Description |
|---|---|---|---|
|
|
|
Target Celery queue |
|
|
|
Max automatic retries on failure |
|
|
|
Delay in seconds before first execution |
|
|
|
Soft time limit in seconds |
|
|
|
Task priority (broker-dependent) |
All ClassVars can be overridden per-environment from YAML without touching code — see jobs section — per-job overrides.
Dispatch a job¶
Use JobService.dispatch() inside a use case to enqueue a job after the current
Unit of Work commits. The job is not sent if the UoW rolls back — dispatch is
transactionally safe:
# app/product/use_cases.py
from loom.core.job.service import JobService
from loom.core.use_case.use_case import UseCase
from loom.core.use_case import Input, LoadById
from app.product.model import Product
from app.product.jobs import SendRestockEmailJob
from app.product.callbacks import RestockEmailSuccessCallback, RestockEmailFailureCallback
class DispatchRestockEmailUseCase(UseCase[Product, DispatchRestockEmailResponse]):
def __init__(self, job_service: JobService) -> None:
self._jobs = job_service
async def execute(
self,
product_id: str,
cmd: DispatchRestockEmailCommand = Input(),
product: Product = LoadById(Product, by="product_id"),
) -> DispatchRestockEmailResponse:
handle = self._jobs.dispatch(
SendRestockEmailJob,
params={"product_id": int(product_id)},
payload={
"product_id": int(product_id),
"recipient_email": cmd.recipient_email,
},
on_success=RestockEmailSuccessCallback,
on_failure=RestockEmailFailureCallback,
)
# handle.job_id is a stable UUID you can return to the caller
return DispatchRestockEmailResponse(job_id=handle.job_id, queue=handle.queue)
dispatch() returns a JobHandle immediately — the broker call happens after the
UoW commits. The HTTP response does not wait for the job to complete.
dispatch() parameters¶
Parameter |
Type |
Description |
|---|---|---|
|
|
Concrete Job subclass to enqueue |
|
|
Path/query params for |
|
|
JSON body passed as |
|
|
Callback invoked by the broker on success |
|
|
Callback invoked by the broker on failure |
|
|
Override the job’s |
|
|
Delay in seconds before execution |
|
|
Override the job’s |
|
|
Absolute datetime for first execution |
Run a job inline¶
JobService.run() executes a job immediately in the calling process and returns
the result. Use it when the use case needs the output before responding — for example
in a synchronous workflow step:
class BuildProductSummaryUseCase(UseCase[Product, ProductSummaryResponse]):
def __init__(self, job_service: JobService) -> None:
self._jobs = job_service
async def execute(self, product_id: str) -> ProductSummaryResponse:
# runs immediately, no broker involved
summary = await self._jobs.run(
BuildProductSummaryJob,
params={"product_id": int(product_id)},
)
return ProductSummaryResponse(product_id=int(product_id), summary=summary)
run() works identically in both InlineJobService (development / tests) and
CeleryJobService (production). When Celery is configured, it bypasses the broker
entirely — the job runs in the API process via the same DI container.
Use dispatch() for fire-and-forget work; use run() when you need the result now.
Callbacks¶
Callbacks are injected by the DI container and can interact with the application
via ApplicationInvoker. Implement on_success and/or on_failure — both can be
sync or async:
# app/product/callbacks.py
from typing import Any
from loom.core.use_case.invoker import ApplicationInvoker
from app.product.model import Product
class RestockEmailSuccessCallback:
"""Tags the product category when a restock email is sent successfully."""
def __init__(self, app: ApplicationInvoker) -> None:
self._app = app
async def on_success(self, job_id: str, result: Any, **context: Any) -> None:
if not bool(result):
return # job ran but nothing was sent
product_id = context.get("product_id")
entity = self._app.entity(Product)
product = await entity.get(params={"id": product_id})
if product:
await entity.update(
params={"id": product_id},
payload={"category": f"{product.category}-restock-notified"},
)
def on_failure(self, job_id: str, exc_type: str, exc_msg: str, **context: Any) -> None:
pass # log or send alert
class RestockEmailFailureCallback:
"""Tags the product category when restock email dispatch fails."""
def __init__(self, app: ApplicationInvoker) -> None:
self._app = app
def on_success(self, job_id: str, result: Any, **context: Any) -> None:
pass
async def on_failure(
self, job_id: str, exc_type: str, exc_msg: str, **context: Any
) -> None:
product_id = context.get("product_id")
entity = self._app.entity(Product)
product = await entity.get(params={"id": product_id})
if product:
await entity.update(
params={"id": product_id},
payload={"category": f"{product.category}-restock-failed"},
)
Callback method signatures¶
# called on task success — result is the value returned by Job.execute()
async def on_success(self, job_id: str, result: Any, **context: Any) -> None: ...
# called on task failure — exc_type / exc_msg describe the exception
async def on_failure(self, job_id: str, exc_type: str, exc_msg: str, **context: Any) -> None: ...
**context contains the raw payload dict passed to dispatch() — use it to
recover entity IDs and other parameters the callback needs.
Fire-and-forget: when a job dispatched via
dispatch()fails, the failure callback is invoked and the exception is silenced. The HTTP layer is not affected. Userun()when you need the exception to propagate synchronously.
YAML configuration reference¶
celery section¶
# config/celery.yaml
celery:
broker_url: ${oc.env:CELERY_BROKER_URL,redis://redis:6379/1}
result_backend: ${oc.env:CELERY_RESULT_BACKEND,redis://redis:6379/2}
# Worker pool
worker_concurrency: 4 # concurrent worker processes (default: 4)
worker_prefetch_multiplier: 1 # 1 = fair dispatch, avoids starvation (default: 1)
worker_max_tasks_per_child: 1000 # recycle worker after N tasks, prevents leaks (default: 1000)
# Serialization
task_serializer: json # default: json
result_serializer: json # default: json
accept_content: [json] # default: [json]
# Time
timezone: UTC
enable_utc: true
# Queues
# Declare explicit queues so the worker knows which to consume.
# If omitted, Celery creates queues on demand.
queues: [default, notifications, analytics, erp]
# task_default_queue: queue used for tasks/callbacks without an explicit queue=.
# MUST be one of 'queues' — the framework raises ValueError at startup otherwise.
# Callbacks (on_success / on_failure) land here when no queue is set.
task_default_queue: default
# For integration tests: run tasks synchronously in the calling process.
task_always_eager: false
jobs section — per-job overrides¶
Override any Job ClassVar from YAML without touching Python code. Only the fields you declare are applied; unset fields keep their ClassVar defaults:
# config/worker.yaml
jobs:
SendRestockEmailJob:
queue: notifications # overrides __queue__ = "notifications"
retries: 2 # overrides __retries__ = 2
timeout: 30 # overrides __timeout__ = 30 (soft time limit)
BuildProductSummaryJob:
queue: analytics
retries: 1
SyncProductToErpJob:
queue: erp
retries: 3
countdown: 5 # delay 5 seconds before first attempt
priority: 5
Field |
Job ClassVar |
Description |
|---|---|---|
|
|
Target Celery queue |
|
|
Max automatic retries |
|
|
Delay in seconds before execution |
|
|
Soft time limit in seconds |
|
|
Task priority |
Worker bootstrap¶
Option 1: Discovery from YAML (recommended)¶
Declare app.discovery in worker.yaml and let the framework discover jobs,
use cases, and interfaces automatically. This is the most production-friendly option:
# config/worker.yaml
includes:
- celery.yaml # broker settings
app:
discovery:
mode: modules
modules:
include:
- app.product.model
- app.product.jobs
- app.product.use_cases
- app.product.interface
- app.product.callbacks
database:
url: ${oc.env:DATABASE_URL}
# src/app/worker_main.py
from loom.celery.bootstrap import bootstrap_worker
from app.runtime_config import worker_config_paths
result = bootstrap_worker(*worker_config_paths())
celery_app = result.celery_app
Discovery automatically includes all use cases — callbacks can call
ApplicationInvoker.entity(...) without any extra configuration.
Option 2: Manifest (explicit, deterministic)¶
Define a manifest module that lists all jobs and callbacks explicitly. Ideal for production when you want full control over what is registered:
# app/manifest.py
from app.product.jobs import SendRestockEmailJob, BuildProductSummaryJob, SyncProductToErpJob
from app.product.callbacks import RestockEmailSuccessCallback, RestockEmailFailureCallback
from app.product.use_cases import DispatchRestockEmailUseCase, BuildProductSummaryUseCase
from app.product.model import Product
MODELS = [Product]
JOBS = [SendRestockEmailJob, BuildProductSummaryJob, SyncProductToErpJob]
CALLBACKS = [RestockEmailSuccessCallback, RestockEmailFailureCallback]
USE_CASES = [DispatchRestockEmailUseCase, BuildProductSummaryUseCase]
# config/worker.yaml
includes:
- celery.yaml
app:
discovery:
mode: manifest
manifest:
module: app.manifest
database:
url: ${oc.env:DATABASE_URL}
Option 3: Explicit jobs in code¶
For simple workers or micro-services with a single job, pass jobs directly.
If app.discovery is also present in the YAML (e.g. inherited from a shared
base.yaml), use cases discovered there are automatically added to the registry —
callbacks can use ApplicationInvoker without any extra parameter:
# src/app/worker_main.py
from loom.celery.bootstrap import bootstrap_worker
from app.product.jobs import SendRestockEmailJob
from app.product.callbacks import RestockEmailSuccessCallback
result = bootstrap_worker(
"config/worker.yaml", # includes base.yaml → app.discovery auto-supplements registry
jobs=[SendRestockEmailJob],
callbacks=[RestockEmailSuccessCallback],
)
celery_app = result.celery_app
If app.discovery is not in the YAML and callbacks need ApplicationInvoker,
pass the interfaces whose routes the callbacks interact with:
from app.product.interface import ProductInterface
result = bootstrap_worker(
"config/worker.yaml",
jobs=[SendRestockEmailJob],
interfaces=[ProductInterface], # auto-extracts use-cases from routes (including AutoCRUD)
callbacks=[RestockEmailSuccessCallback],
)
Or pass use-case classes directly for non-AutoCRUD scenarios:
from app.product.use_cases import GetProductUseCase, UpdateProductUseCase
result = bootstrap_worker(
"config/worker.yaml",
jobs=[SendRestockEmailJob],
use_cases=[GetProductUseCase, UpdateProductUseCase],
callbacks=[RestockEmailSuccessCallback],
)
interfaces= and use_cases= can be combined. Both are additive on top of any
discovered compilables — they never disable discovery.
Callbacks and ApplicationInvoker — how it works¶
Callbacks are DI-injected. When a callback declares app: ApplicationInvoker in
its constructor, the container provides an AppInvoker backed by the worker’s
UseCaseFactory and UseCaseRegistry.
app.entity(Product) constructs a key like "product:get" and resolves it against
the registry. The use-case must be compiled in the worker process — if it is not,
a KeyError is raised at callback execution time.
The three ways to ensure this are described in Worker bootstrap:
Use discovery — all use cases are included automatically
Pass
interfaces=— use cases are extracted from interface route declarationsPass
use_cases=— declare use cases explicitly
When using the jobs= explicit bootstrap and worker.yaml inherits
app.discovery from a base config, the framework supplements the registry
automatically from the discovery config — no code change needed.
Start the worker¶
# development
uv run celery -A app.worker_main:celery_app worker --loglevel=INFO
# specific queues only
uv run celery -A app.worker_main:celery_app worker \
--queues=notifications,analytics \
--loglevel=INFO
# multiple queues with concurrency
uv run celery -A app.worker_main:celery_app worker \
--queues=notifications,analytics,erp \
--concurrency=4 \
--loglevel=INFO
Integration tests¶
Set task_always_eager: true in the test config so tasks run synchronously in the
same process without requiring a broker:
# config/test.yaml
celery:
broker_url: "redis://localhost:6379/15"
result_backend: "redis://localhost:6379/15"
task_always_eager: true
Or patch it in a pytest fixture:
import pytest
from app.worker_main import celery_app
@pytest.fixture(autouse=True)
def celery_eager(monkeypatch):
celery_app.conf.task_always_eager = True
yield
celery_app.conf.task_always_eager = False
Docker-compose stack¶
The companion dummy-loom ships a
full compose stack with postgres, redis, API, worker, and Flower:
make up # start all services
make logs # follow logs for all services
make down # tear down
Services after make up:
Service |
Port |
Description |
|---|---|---|
|
8000 |
FastAPI application (Swagger at |
|
— |
Celery worker (queues: notifications, analytics, erp) |
|
5555 |
Celery Flower dashboard |
|
5432 |
PostgreSQL 16 |
|
6379 |
Broker and result backend |
Flower is available at http://localhost:5555 after make up.