Chain two remote tasks in celery by send_task
I tried to do the same. I couldn't find any built-in functionality to create a task just by its name, however it wouldn't be difficult to add a snap-in like this:
from celery import Task as BaseTask
class Task(BaseTask):
def __init__(self, name, *args, **kwargs):
super(BaseTask, self).__init__(*args, **kwargs)
self.name = name
With this class, you can do things like:
(
Task('worker.hello').s('world') |
Task('messaging.email-results').s(email_address='user@company.com')
)()
Or alternatively:
app.send_task(
'worker.hello', ['world'],
chain=[
Task('messaging.email-results').s(email_address='user@company.com')
]
)
EDIT:
Apart from the above , I figured out that the correct way to do this is to use a class Signature
(as @Chrismit mentioned below):
from celery import Signature
(
Signature('worker.hello', args=['world']) |
Signature('messaging.email-results', kwargs={'email_address':'user@company.org'})
)()
Or alternatively:
from celery import Signature
app.send_task(
'worker.hello', ['world'],
chain=[
Signature('messaging.email-results', kwargs={'email_address': 'user@company.com'})
]
)
Important note: Any tasks after the first task in the chain are not actually scheduled until the workflow has processed the task before it (this makes sense since we don't know the tasks before the previous task is completed). The following tasks are scheduled based on worker code. For this reason, you need to ensure that one of the following values ββis true /
- each of your workers knows
task_routes
so that he can place subsequent tasks in the appropriate queue (for example, in my example, he needs to know that tasks starting withmessaging.*
must go in the queue'messaging'
) -
you have coded the correct one
queue
in each classSignature
when chaining. Celery already has tools to get the queue name from the task name that you can rely on:def get_queue_name(task_name): return app.amqp.router.route({}, task_name)['queue'].name ( Signature('worker.hello', args=['world']) | Signature( 'messaging.email-results', kwargs={'email_address':'user@company.org'}, queue=get_queue_name('messaging.email-results') # If typing the task name twice annoys you, you could subclass Signature to do this automatically ) )()
(I think this is the cleanest solution as it keeps workers unaware of each other)
- all tasks run in the default queue. If you do not have an announcement
task_routes
announced at the workplace, and the task signature is not specifiedqueue
, then celery will schedule this task from the workerdefault_queue
. If not configured, that is'celery'
. I highly recommend doing this as it is not very explicit and does not allow much queue management, but it is an option nonetheless.
source to share
you can use celery canvas objects .
from celery import chain my_chain = chain(task1.s((arg1, arg2)), task2.s((arg3, arg4)) result = my_chain.apply_async()
source to share