Celery¶
Though it is not required, you can use dishka-celery integration. It features:
automatic REQUEST scope management using signals
injection of dependencies into task handler function using decorator
automatic injection of dependencies into task handler function.
How to use¶
Import
from dishka.integrations.celery import (
DishkaTask,
FromDishka,
inject,
setup_dishka,
)
from dishka import make_container, Provider, provide, Scope
Create provider and container as usual
(optional) Set task class to your celery app to enable automatic injection for all task handlers
celery_app = Celery(task_cls=DishkaTask)
or for one task handler
@celery_app.task(base=DishkaTask)
def start(
gateway: FromDishka[Gateway],
):
...
Mark those of your task handlers parameters which are to be injected with
FromDishka[]
@celery_app.task
def start(
gateway: FromDishka[Gateway],
):
...
(optional) Decorate them using
@injectif you are not usingDishkaTask.
@celery_app.task
@inject
def start(
gateway: FromDishka[Gateway],
):
...
(optional) Setup signal to close container when worker process shutdown
from celery import current_app
from celery.signals import worker_process_shutdown
from dishka import Container
@worker_process_shutdown.connect()
def close_dishka(*args, **kwargs):
container: Container = current_app.conf["dishka_container"]
container.close()
Setup
dishkaintegration
setup_dishka(container=container, app=celery_app)