In celery, how to update the state of a main task until all of its subtasks have been completed?
In Celery, I run a main task that runs one subtask for each item it receives from a request. Subtasks must run in parallel. In the UI, I have a progress bar that shows how many sub-tasks have been done in total. I am updating the main state of the task to provide information to the progress bar. My problem is that the main task finished right after clicking on all subtasks to the broker so that I can no longer update its state. I wish the main task could wait until all sub-tasks are completed. Is it possible? Any other solutions? Here's my pseudocode (real code doesn't use global ;-)).
total = 0
done = 0
@task(ignore_result=True)
def copy_media(path):
global total, done
copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
documents = Document.objects.all()
total = documents.count()
copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
for document in documents:
process_doc.delay(document, path, copy_media)
@task(ignore_result=True)
def process_doc(document, path, copy_media):
global total, done
# Do some stuff
done += 1
copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
source to share
I found a way using TaskSet
. But I am not completely satisfied because I cannot ignore the result of the subtasks. If I ignore the result for the task process_doc
results.ready()
, always return False
, results.completed_count()
always return 0, etc. Here is the code:
@task(ignore_result=True)
def copy_media(path):
copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
documents = Document.objects.all()
total = documents.count()
copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
job = TaskSet(tasks=[process_doc.subtask((document, path))
for document in documents])
results = job.apply_async()
doc_name = ''
while not results.ready():
done = results.completed_count()
if done:
last = done - 1
for idx in xrange(last, -1, -1):
if results[idx].ready():
doc_name = results[idx].result
break
copy_media.update_state(state=STARTED, meta={'total': total, 'done': done, 'doc-name': doc_name})
time.sleep(0.25)
@task()
def process_doc(document, path):
# Do some stuff
return document
source to share
You can use caching with memcached cache to store the number of complete tasks. There's even cache.inrc
django api for atomic incrementing to make sure concurrent counter updates don't screw things up.
Also, doing the main task before all sub-tasks are complete is a bad idea because you are blocking one of the celery workers for a long time. If celery is running with one worker process, it will lead to endless blocking.
source to share