In celery, how to update the state of a main task until all of its subtasks have been completed?

In Celery, I run a main task that runs one subtask for each item it receives from a request. Subtasks must run in parallel. In the UI, I have a progress bar that shows how many sub-tasks have been done in total. I am updating the main state of the task to provide information to the progress bar. My problem is that the main task finished right after clicking on all subtasks to the broker so that I can no longer update its state. I wish the main task could wait until all sub-tasks are completed. Is it possible? Any other solutions? Here's my pseudocode (real code doesn't use global ;-)).

total = 0
done = 0

@task(ignore_result=True)
def copy_media(path):
    global total, done
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
    documents = Document.objects.all()
    total = documents.count()
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
    for document in documents:
        process_doc.delay(document, path, copy_media)

@task(ignore_result=True)
def process_doc(document, path, copy_media):
    global total, done
    # Do some stuff
    done += 1
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})

      

+3


source to share


3 answers


I found a way using TaskSet

. But I am not completely satisfied because I cannot ignore the result of the subtasks. If I ignore the result for the task process_doc

results.ready()

, always return False

, results.completed_count()

always return 0, etc. Here is the code:



@task(ignore_result=True)
def copy_media(path):
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
    documents = Document.objects.all()
    total = documents.count()
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
    job = TaskSet(tasks=[process_doc.subtask((document, path))
                         for document in documents])
    results = job.apply_async()
    doc_name = ''
    while not results.ready():
        done = results.completed_count()
        if done:
            last = done - 1
            for idx in xrange(last, -1, -1):
                if results[idx].ready():
                    doc_name = results[idx].result
                    break
        copy_media.update_state(state=STARTED, meta={'total': total, 'done': done, 'doc-name': doc_name})
        time.sleep(0.25)

@task()
def process_doc(document, path):
    # Do some stuff
    return document

      

0


source


You can use caching with memcached cache to store the number of complete tasks. There's even cache.inrc

django api for atomic incrementing to make sure concurrent counter updates don't screw things up.



Also, doing the main task before all sub-tasks are complete is a bad idea because you are blocking one of the celery workers for a long time. If celery is running with one worker process, it will lead to endless blocking.

0


source


I don't know which version of celery you are using, but you can look at Group subtasks (new in 3.0).

0


source







All Articles