How to clear tasks with countdown timers from celery queue

There are hundreds of countdown tasks in my Celery queue that will make them run over the next few hours. Is there a way for these tasks to run immediately so that the queue is effectively flushed out?

I am currently planning an upgrade to our server and I want to make sure no background tasks are running during the upgrade. If I have to wait for these countdowns, that's fine, but I would rather force tasks to complete.

Another option might be to pause the queue until the update is complete, but flushing seems to be the best option.

EDIT : I figured out how to find the list of scheduled tasks:

from celery.task.control import inspect
i = inspect()
tasks = i.scheduled()

      

Now I just need to figure out how to get them to execute.

+3


source to share


1 answer


Okay, I'm sure I've figured out how to do this. I'm making this wiki answer and jotting down my notes in case anyone wants to set up a general process here.

The general idea is this:

Note that this does not support adding eta

to items after they are re-queued, as is possible for implementation.

So, to find out which tasks are queued, follow these steps:

from celery.task.control import inspect
i = inspect()
scheduled_tasks = i.scheduled()

      



Which returns a dict, for example:

{u'w1.courtlistener.com': [{u'eta': 1414435210.198864,
   u'priority': 6,
   u'request': {u'acknowledged': False,
    u'args': u'(2745724,)',
    u'delivery_info': {u'exchange': u'celery',
     u'priority': None,
     u'routing_key': u'celery'},
    u'hostname': u'w1.courtlistener.com',
    u'id': u'99bc8650-3be1-4d24-81d6-a882d77a8b25',
    u'kwargs': u'{}',
    u'name': u'citations.tasks.update_document_by_id',
    u'time_start': None,
    u'worker_pid': None}}]}

      

The next step is to cancel all these tasks, for example:

from celery.task.control import revoke
with open('revoked_tasks.csv', 'w') as f:
    for worker, tasks in scheduled_tasks.iteritems():
        print "Now processing worker: %s" % worker
        for task in tasks:
            print "Now revoking task: %s. %s with args: %s and kwargs: %s" % \
              (task['request']['id'], task['request']['name'], task['request']['args'], task['request']['kwargs'])
            f.write('%s|%s|%s|%s|%s\n' % (worker, task['request']['name'], task['request']['id'], task['request']['args'], task['request']['kwargs']))
            revoke(task['request']['id'], terminate=True)

      

Then, finally, re-run the tasks as usual by loading them from the CSV file:

with open('revoked_tasks', 'r') as f:
    for line in f:
        worker, command, id, args, kwargs = line.split("|")
        # Impost task here, something like...
        package, module = command.rsplit('.', 1)
        mod = __import__(package, globals(), locals(), [module])

        # Run the commands, something like...
        mod.__get_attribute__(module).delay(args*, kwargs**)

      

+2


source







All Articles