Unit testing celery tasks directly
I know this will be considered a duplicate, but I had a look around before asking this question, however all the questions seem to be either outdated or don't help my problem at all.This is where I looked before writing this question:
- Official documents
- How did you unit test the Celery job? (5 years, all dead links)
- How to unit test the code that does celery tasks? (2 years)
- How to fix Celery tasks during unit testing? (3 years)
I am currently working on a project that makes heavy use of Celery for handling asynchronous tasks; to make the whole codebase stable. I am writing unit tests for the whole project, but so far I have not been able to write one working test for Celery.
Most of my codes need to track the tasks that have been started to determine if or not all of the results may be ready for polling. This is implemented in my code like this:
@app.task(bind=True)
def some_task(self, record_id):
associate(self.request.id, record_id) # Not the actual DB code, but you get the idea
# Somewhere else in my code, eg: Flask endpoint
record = some_db_record()
some_task.apply_async(args=[record.id])
Since I don't have a nix-based machine to run my code, I tried to solve this problem by setting the always impatience parameter to true, however this causes problems when any sub-task tries to query for the result:
@app.task(bind=True)
def foo(self):
task = bar.apply_async()
foo_poll.apply_async(args=[task.id])
@app.task(bind=True, max_retries=None):
def foo_poll(self, celery_id)
task = AsyncResult(celery_id)
if not task.ready(): # RuntimeError: Cannot retrieve result with task_always_eager enabled
return self.retry(countdown=5)
else:
pass # Do something with the result
@app.task
def bar():
time.sleep(10)
I tried to fix it by fixing the methods AsyncResult
, however this caused problems as it self.request.id
was None
:
with patch.object(AsyncResult, "_get_task_meta", side_effect=lambda: {"status": SUCCESS, "result": None}) as method:
foo()
@app.task(bind=True)
def foo(self):
pass # self.request.id is now None, which I need to track sub-tasks
Does anyone know how I can do this? Or if celery is even worth using? I am at the point where I find the documentation and any testing related questions so difficult that I just feel like this is all happening and back to multithreading.
source to share
I had the same problem and suggested two possible approaches:
- Call tasks in tests directly and wrap all internal celery interacting with
if self.request.called_directly
and run the task directly if True or withapply_async
if False. - Wrap
task.ready()
and check other states with functions where I checkALWAYS_EAGER
and task readiness.
In the end I came up with a peculiar combination of both rules to avoid nested tasks as much as I can. And also put as little code as possible in @app.task
order to be able to test task functions as much isolation as possible.
This may sound rather unpleasant and awful, but in reality it is not.
Also you can check how big guys like Sentry do it (spoiler: mocks and some helpful helpers).
So it's definitely possible, it's just not an easy way to find a few best practices.
source to share
I haven't used celery in a while, and if something hasn't changed, you'll have to call your methods directly to use them as unit tests.
@app.task(bind=True, max_retries=None):
def foo_poll(self, celery_id)
task = AsyncResult(celery_id)
if not task.ready(): # RuntimeError: Cannot retrieve result with task_always_eager enabled
return self.retry(countdown=5)
else:
pass # Do something with the result
For your unit test, you can:
- patch
AsyncResult
to call the correct branch - Create your class
- Patch
retry
method and assert what it is called - Control your method directly
This is, of course, only an exercise in the logic of the method, not celery. I usually install one or two integration (collaboration) tests that tell ALWAYS_EAGER to go through the celery code even though the celery will run in memory out of order.
source to share