Chain two remote tasks in celery by send_task

In celery, I want to combine two tasks from a remote worker together. Can someone tell me how to specify it in send_task? Or is there any other way to call remote tasks?

BR

+3


source to share


4 answers


The easiest way is the 'signature' function:



from celery import signature
chain = signature(
  'app_name.task_1',
  kwargs={..},
  queue='this_queue'
)
chain |= signature(
  'app_name2.task2',
  kwargs={..},
  queue='another_queue'
)
chain.apply_async()

      

+1


source


I tried to do the same. I couldn't find any built-in functionality to create a task just by its name, however it wouldn't be difficult to add a snap-in like this:

from celery import Task as BaseTask

class Task(BaseTask):
    def __init__(self, name, *args, **kwargs):
        super(BaseTask, self).__init__(*args, **kwargs)
        self.name = name

      

With this class, you can do things like:

(
    Task('worker.hello').s('world') | 
    Task('messaging.email-results').s(email_address='user@company.com')
)()

      

Or alternatively:

app.send_task(
    'worker.hello', ['world'], 
    chain=[
        Task('messaging.email-results').s(email_address='user@company.com')
    ]
)

      

EDIT:



Apart from the above , I figured out that the correct way to do this is to use a class Signature

(as @Chrismit mentioned below):

from celery import Signature

(
    Signature('worker.hello', args=['world']) | 
    Signature('messaging.email-results', kwargs={'email_address':'user@company.org'})
)()

      

Or alternatively:

from celery import Signature

app.send_task(
    'worker.hello', ['world'], 
    chain=[
        Signature('messaging.email-results', kwargs={'email_address': 'user@company.com'})
    ]
)

      

Important note: Any tasks after the first task in the chain are not actually scheduled until the workflow has processed the task before it (this makes sense since we don't know the tasks before the previous task is completed). The following tasks are scheduled based on worker code. For this reason, you need to ensure that one of the following values ​​is true /

  • each of your workers knows task_routes

    so that he can place subsequent tasks in the appropriate queue (for example, in my example, he needs to know that tasks starting with messaging.*

    must go in the queue 'messaging'

    )
  • you have coded the correct one queue

    in each class Signature

    when chaining. Celery already has tools to get the queue name from the task name that you can rely on:

    def get_queue_name(task_name):
        return app.amqp.router.route({}, task_name)['queue'].name
    
    (
        Signature('worker.hello', args=['world']) | 
        Signature(
            'messaging.email-results',
            kwargs={'email_address':'user@company.org'},
            queue=get_queue_name('messaging.email-results')  # If typing the task name twice annoys you, you could subclass Signature to do this automatically
        )
    )()
    
          

    (I think this is the cleanest solution as it keeps workers unaware of each other)

  • all tasks run in the default queue. If you do not have an announcement task_routes

    announced at the workplace, and the task signature is not specified queue

    , then celery will schedule this task from the worker default_queue

    . If not configured, that is 'celery'

    . I highly recommend doing this as it is not very explicit and does not allow much queue management, but it is an option nonetheless.
+1


source


I did it like this: The trick is that you need to create a new signature from your task name.

app.send_task("{0}.{1}".format(app_name,task),args, link = [app.signature("{0}.{1}".format(app_name,task),next_args)])

      

0


source


you can use celery canvas objects .

from celery import chain
my_chain = chain(task1.s((arg1, arg2)), task2.s((arg3, arg4))
result = my_chain.apply_async()

      

-2


source







All Articles