How can I configure celery to call custom initialization?

I am completely new to Celery and I was trying to set up a project with two separate queues (one for calculation and one for execution). So far, so good.

My problem is that the workers in the run queue need to instantiate the class with a unique object_id (one ID per worker). I was wondering if I could write a personalized initialization to initialize an object on startup and keep it in memory until the worker is killed.

I found a similar question in custom_task , but the suggested solution does not work in my case.

Considering the following toy example:

celery.py

from celery import Celery

app = Celery('proj',
             broker='amqp://guest@localhost//',
             backend='amqp://',
             include=['proj.tasks'])

app.conf.update(
    CELERY_TASK_RESULT_EXPIRES=60,
    CELERY_ROUTES = {"proj.tasks.add1": {"queue": "q1"}},
)

if __name__ == '__main__':
    app.start()

      

tasks.py

from proj.celery import app
from celery.signals import worker_init

@worker_init.connect(sender='worker1@hostname')
def configure_worker1(*args, **kwargs):
    #SETUP id=1 for add1 here???

@worker_init.connect(sender='worker2@hostname')
def configure_worker2(*args, **kwargs):
    #SETUP id=2 for add1 here???

@app.task
def add1(y):
    return id + y

@app.task
def add(x, y):
    return x + y

      

initialization:

celery multi start worker1 -A proj -l info -Q q1
celery multi start worker2 -A proj -l info -Q q1
celery multi start worker3 -A proj -l info

      

Is this the correct approach? If so, what should I write in the function configure_worker1

in tasks.py

to be set id

when initializing the workflow?

thank

+3


source to share


1 answer


I found out the answer by following this http://docs.celeryproject.org/en/latest/userguide/tasks.html#instantiation

The .py task looks like this:

from proj.celery import app
from celery import Task

class Task1(Task):
    def __init__(self):
        self._x = 1.0

class Task2(Task):
    def __init__(self):
        self._x = 2.0

@app.task(base=Task1)
def add1(y):
    return add1._x + y

@app.task(base=Task2)
def add2(y):
    return add2._x + y

      



initialization is still:

celery multi start worker1 -A proj -l info -Q q1
celery multi start worker2 -A proj -l info -Q q1
celery multi start worker3 -A proj -l info

      

+2


source







All Articles