Celery worker

loom-kernel bootstraps Celery workers from the same config and discovery system used by the REST API. Jobs are first-class citizens — they support Input(), LoadById(), Rule, and full DI injection just like use cases.


Install

pip install "loom-kernel[celery]"

Core concepts

Concept

Purpose

Job

Background unit of work — runs asynchronously on a worker

JobService.dispatch()

Enqueue a job after the current UoW commits (fire-and-forget)

JobService.run()

Execute a job immediately in the calling process

JobCallback

Hook invoked by the broker on task success or failure

bootstrap_worker()

Single entry point to start a Celery worker

CeleryConfig

YAML-driven broker and worker settings

JobConfig

Per-job routing and retry overrides from YAML


Define a job

A Job declares its queue via __queue__ and implements execute(). LoadById loads an entity from the repository before execute() runs — same pattern as use cases:

# app/product/jobs.py
from loom.core.command import Command
from loom.core.job.job import Job
from loom.core.use_case import Input, LoadById

from app.product.model import Product


class SendRestockEmailJobCommand(Command, frozen=True):
    product_id: int
    recipient_email: str


class SendRestockEmailJob(Job[bool]):
    """Send a restock notification email. Returns True when email was sent."""

    __queue__ = "notifications"
    __retries__ = 2
    __timeout__ = 30  # soft time limit in seconds

    async def execute(
        self,
        product_id: int,
        cmd: SendRestockEmailJobCommand = Input(),
        product: Product = LoadById(Product, by="product_id"),
    ) -> bool:
        if product.stock > 0:
            return False
        # send email to cmd.recipient_email …
        return True

Job ClassVars reference

ClassVar

Type

Default

Description

__queue__

str

"default"

Target Celery queue

__retries__

int

3

Max automatic retries on failure

__countdown__

int

0

Delay in seconds before first execution

__timeout__

int | None

None

Soft time limit in seconds

__priority__

int

0

Task priority (broker-dependent)

All ClassVars can be overridden per-environment from YAML without touching code — see jobs section — per-job overrides.


Dispatch a job

Use JobService.dispatch() inside a use case to enqueue a job after the current Unit of Work commits. The job is not sent if the UoW rolls back — dispatch is transactionally safe:

# app/product/use_cases.py
from loom.core.job.service import JobService
from loom.core.use_case.use_case import UseCase
from loom.core.use_case import Input, LoadById

from app.product.model import Product
from app.product.jobs import SendRestockEmailJob
from app.product.callbacks import RestockEmailSuccessCallback, RestockEmailFailureCallback


class DispatchRestockEmailUseCase(UseCase[Product, DispatchRestockEmailResponse]):
    def __init__(self, job_service: JobService) -> None:
        self._jobs = job_service

    async def execute(
        self,
        product_id: str,
        cmd: DispatchRestockEmailCommand = Input(),
        product: Product = LoadById(Product, by="product_id"),
    ) -> DispatchRestockEmailResponse:
        handle = self._jobs.dispatch(
            SendRestockEmailJob,
            params={"product_id": int(product_id)},
            payload={
                "product_id": int(product_id),
                "recipient_email": cmd.recipient_email,
            },
            on_success=RestockEmailSuccessCallback,
            on_failure=RestockEmailFailureCallback,
        )
        # handle.job_id is a stable UUID you can return to the caller
        return DispatchRestockEmailResponse(job_id=handle.job_id, queue=handle.queue)

dispatch() returns a JobHandle immediately — the broker call happens after the UoW commits. The HTTP response does not wait for the job to complete.

dispatch() parameters

Parameter

Type

Description

job_type

type[Job]

Concrete Job subclass to enqueue

params

dict | None

Path/query params for execute() signature

payload

dict | None

JSON body passed as Input() command

on_success

type[Callback] | None

Callback invoked by the broker on success

on_failure

type[Callback] | None

Callback invoked by the broker on failure

queue

str | None

Override the job’s __queue__ for this dispatch only

countdown

int | None

Delay in seconds before execution

priority

int | None

Override the job’s __priority__

eta

datetime | None

Absolute datetime for first execution


Run a job inline

JobService.run() executes a job immediately in the calling process and returns the result. Use it when the use case needs the output before responding — for example in a synchronous workflow step:

class BuildProductSummaryUseCase(UseCase[Product, ProductSummaryResponse]):
    def __init__(self, job_service: JobService) -> None:
        self._jobs = job_service

    async def execute(self, product_id: str) -> ProductSummaryResponse:
        # runs immediately, no broker involved
        summary = await self._jobs.run(
            BuildProductSummaryJob,
            params={"product_id": int(product_id)},
        )
        return ProductSummaryResponse(product_id=int(product_id), summary=summary)

run() works identically in both InlineJobService (development / tests) and CeleryJobService (production). When Celery is configured, it bypasses the broker entirely — the job runs in the API process via the same DI container.

Use dispatch() for fire-and-forget work; use run() when you need the result now.


Callbacks

Callbacks are injected by the DI container and can interact with the application via ApplicationInvoker. Implement on_success and/or on_failure — both can be sync or async:

# app/product/callbacks.py
from typing import Any
from loom.core.use_case.invoker import ApplicationInvoker
from app.product.model import Product


class RestockEmailSuccessCallback:
    """Tags the product category when a restock email is sent successfully."""

    def __init__(self, app: ApplicationInvoker) -> None:
        self._app = app

    async def on_success(self, job_id: str, result: Any, **context: Any) -> None:
        if not bool(result):
            return  # job ran but nothing was sent
        product_id = context.get("product_id")
        entity = self._app.entity(Product)
        product = await entity.get(params={"id": product_id})
        if product:
            await entity.update(
                params={"id": product_id},
                payload={"category": f"{product.category}-restock-notified"},
            )

    def on_failure(self, job_id: str, exc_type: str, exc_msg: str, **context: Any) -> None:
        pass  # log or send alert


class RestockEmailFailureCallback:
    """Tags the product category when restock email dispatch fails."""

    def __init__(self, app: ApplicationInvoker) -> None:
        self._app = app

    def on_success(self, job_id: str, result: Any, **context: Any) -> None:
        pass

    async def on_failure(
        self, job_id: str, exc_type: str, exc_msg: str, **context: Any
    ) -> None:
        product_id = context.get("product_id")
        entity = self._app.entity(Product)
        product = await entity.get(params={"id": product_id})
        if product:
            await entity.update(
                params={"id": product_id},
                payload={"category": f"{product.category}-restock-failed"},
            )

Callback method signatures

# called on task success — result is the value returned by Job.execute()
async def on_success(self, job_id: str, result: Any, **context: Any) -> None: ...

# called on task failure — exc_type / exc_msg describe the exception
async def on_failure(self, job_id: str, exc_type: str, exc_msg: str, **context: Any) -> None: ...

**context contains the raw payload dict passed to dispatch() — use it to recover entity IDs and other parameters the callback needs.

Fire-and-forget: when a job dispatched via dispatch() fails, the failure callback is invoked and the exception is silenced. The HTTP layer is not affected. Use run() when you need the exception to propagate synchronously.


YAML configuration reference

celery section

# config/celery.yaml
celery:
  broker_url: ${oc.env:CELERY_BROKER_URL,redis://redis:6379/1}
  result_backend: ${oc.env:CELERY_RESULT_BACKEND,redis://redis:6379/2}

  # Worker pool
  worker_concurrency: 4               # concurrent worker processes (default: 4)
  worker_prefetch_multiplier: 1       # 1 = fair dispatch, avoids starvation (default: 1)
  worker_max_tasks_per_child: 1000    # recycle worker after N tasks, prevents leaks (default: 1000)

  # Serialization
  task_serializer: json               # default: json
  result_serializer: json             # default: json
  accept_content: [json]              # default: [json]

  # Time
  timezone: UTC
  enable_utc: true

  # Queues
  # Declare explicit queues so the worker knows which to consume.
  # If omitted, Celery creates queues on demand.
  queues: [default, notifications, analytics, erp]

  # task_default_queue: queue used for tasks/callbacks without an explicit queue=.
  # MUST be one of 'queues' — the framework raises ValueError at startup otherwise.
  # Callbacks (on_success / on_failure) land here when no queue is set.
  task_default_queue: default

  # For integration tests: run tasks synchronously in the calling process.
  task_always_eager: false

jobs section — per-job overrides

Override any Job ClassVar from YAML without touching Python code. Only the fields you declare are applied; unset fields keep their ClassVar defaults:

# config/worker.yaml
jobs:
  SendRestockEmailJob:
    queue: notifications   # overrides __queue__ = "notifications"
    retries: 2             # overrides __retries__ = 2
    timeout: 30            # overrides __timeout__ = 30 (soft time limit)

  BuildProductSummaryJob:
    queue: analytics
    retries: 1

  SyncProductToErpJob:
    queue: erp
    retries: 3
    countdown: 5           # delay 5 seconds before first attempt
    priority: 5

Field

Job ClassVar

Description

queue

__queue__

Target Celery queue

retries

__retries__

Max automatic retries

countdown

__countdown__

Delay in seconds before execution

timeout

__timeout__

Soft time limit in seconds

priority

__priority__

Task priority


Worker bootstrap

Option 2: Manifest (explicit, deterministic)

Define a manifest module that lists all jobs and callbacks explicitly. Ideal for production when you want full control over what is registered:

# app/manifest.py
from app.product.jobs import SendRestockEmailJob, BuildProductSummaryJob, SyncProductToErpJob
from app.product.callbacks import RestockEmailSuccessCallback, RestockEmailFailureCallback
from app.product.use_cases import DispatchRestockEmailUseCase, BuildProductSummaryUseCase
from app.product.model import Product

MODELS = [Product]
JOBS = [SendRestockEmailJob, BuildProductSummaryJob, SyncProductToErpJob]
CALLBACKS = [RestockEmailSuccessCallback, RestockEmailFailureCallback]
USE_CASES = [DispatchRestockEmailUseCase, BuildProductSummaryUseCase]
# config/worker.yaml
includes:
  - celery.yaml

app:
  discovery:
    mode: manifest
    manifest:
      module: app.manifest

database:
  url: ${oc.env:DATABASE_URL}

Option 3: Explicit jobs in code

For simple workers or micro-services with a single job, pass jobs directly. If app.discovery is also present in the YAML (e.g. inherited from a shared base.yaml), use cases discovered there are automatically added to the registry — callbacks can use ApplicationInvoker without any extra parameter:

# src/app/worker_main.py
from loom.celery.bootstrap import bootstrap_worker
from app.product.jobs import SendRestockEmailJob
from app.product.callbacks import RestockEmailSuccessCallback

result = bootstrap_worker(
    "config/worker.yaml",   # includes base.yaml → app.discovery auto-supplements registry
    jobs=[SendRestockEmailJob],
    callbacks=[RestockEmailSuccessCallback],
)
celery_app = result.celery_app

If app.discovery is not in the YAML and callbacks need ApplicationInvoker, pass the interfaces whose routes the callbacks interact with:

from app.product.interface import ProductInterface

result = bootstrap_worker(
    "config/worker.yaml",
    jobs=[SendRestockEmailJob],
    interfaces=[ProductInterface],     # auto-extracts use-cases from routes (including AutoCRUD)
    callbacks=[RestockEmailSuccessCallback],
)

Or pass use-case classes directly for non-AutoCRUD scenarios:

from app.product.use_cases import GetProductUseCase, UpdateProductUseCase

result = bootstrap_worker(
    "config/worker.yaml",
    jobs=[SendRestockEmailJob],
    use_cases=[GetProductUseCase, UpdateProductUseCase],
    callbacks=[RestockEmailSuccessCallback],
)

interfaces= and use_cases= can be combined. Both are additive on top of any discovered compilables — they never disable discovery.


Callbacks and ApplicationInvoker — how it works

Callbacks are DI-injected. When a callback declares app: ApplicationInvoker in its constructor, the container provides an AppInvoker backed by the worker’s UseCaseFactory and UseCaseRegistry.

app.entity(Product) constructs a key like "product:get" and resolves it against the registry. The use-case must be compiled in the worker process — if it is not, a KeyError is raised at callback execution time.

The three ways to ensure this are described in Worker bootstrap:

  1. Use discovery — all use cases are included automatically

  2. Pass interfaces= — use cases are extracted from interface route declarations

  3. Pass use_cases= — declare use cases explicitly

When using the jobs= explicit bootstrap and worker.yaml inherits app.discovery from a base config, the framework supplements the registry automatically from the discovery config — no code change needed.


Start the worker

# development
uv run celery -A app.worker_main:celery_app worker --loglevel=INFO

# specific queues only
uv run celery -A app.worker_main:celery_app worker \
    --queues=notifications,analytics \
    --loglevel=INFO

# multiple queues with concurrency
uv run celery -A app.worker_main:celery_app worker \
    --queues=notifications,analytics,erp \
    --concurrency=4 \
    --loglevel=INFO

Integration tests

Set task_always_eager: true in the test config so tasks run synchronously in the same process without requiring a broker:

# config/test.yaml
celery:
  broker_url: "redis://localhost:6379/15"
  result_backend: "redis://localhost:6379/15"
  task_always_eager: true

Or patch it in a pytest fixture:

import pytest
from app.worker_main import celery_app

@pytest.fixture(autouse=True)
def celery_eager(monkeypatch):
    celery_app.conf.task_always_eager = True
    yield
    celery_app.conf.task_always_eager = False

Docker-compose stack

The companion dummy-loom ships a full compose stack with postgres, redis, API, worker, and Flower:

make up      # start all services
make logs    # follow logs for all services
make down    # tear down

Services after make up:

Service

Port

Description

api

8000

FastAPI application (Swagger at /docs)

worker

Celery worker (queues: notifications, analytics, erp)

flower

5555

Celery Flower dashboard

postgres

5432

PostgreSQL 16

redis

6379

Broker and result backend

Flower is available at http://localhost:5555 after make up.