Unit testing celery tasks directly

I know this will be considered a duplicate, but I had a look around before asking this question, however all the questions seem to be either outdated or don't help my problem at all.This is where I looked before writing this question:


I am currently working on a project that makes heavy use of Celery for handling asynchronous tasks; to make the whole codebase stable. I am writing unit tests for the whole project, but so far I have not been able to write one working test for Celery.

Most of my codes need to track the tasks that have been started to determine if or not all of the results may be ready for polling. This is implemented in my code like this:

@app.task(bind=True)
def some_task(self, record_id):
    associate(self.request.id, record_id)  # Not the actual DB code, but you get the idea

# Somewhere else in my code, eg: Flask endpoint
record = some_db_record()
some_task.apply_async(args=[record.id])

      

Since I don't have a nix-based machine to run my code, I tried to solve this problem by setting the always impatience parameter to true, however this causes problems when any sub-task tries to query for the result:

@app.task(bind=True)
def foo(self): 
    task = bar.apply_async()
    foo_poll.apply_async(args=[task.id]) 

@app.task(bind=True, max_retries=None):
def foo_poll(self, celery_id)
    task =  AsyncResult(celery_id)
    if not task.ready():  # RuntimeError: Cannot retrieve result with task_always_eager enabled
        return self.retry(countdown=5)
    else:
        pass  # Do something with the result

@app.task
def bar():
    time.sleep(10)

      

I tried to fix it by fixing the methods AsyncResult

, however this caused problems as it self.request.id

was None

:

with patch.object(AsyncResult, "_get_task_meta", side_effect=lambda: {"status": SUCCESS, "result": None}) as method:
    foo()

@app.task(bind=True)
def foo(self):
    pass   # self.request.id is now None, which I need to track sub-tasks

      

Does anyone know how I can do this? Or if celery is even worth using? I am at the point where I find the documentation and any testing related questions so difficult that I just feel like this is all happening and back to multithreading.

+3


source to share


2 answers


I had the same problem and suggested two possible approaches:

  • Call tasks in tests directly and wrap all internal celery interacting with if self.request.called_directly

    and run the task directly if True or with apply_async

    if False.
  • Wrap task.ready()

    and check other states with functions where I check ALWAYS_EAGER

    and task readiness.

In the end I came up with a peculiar combination of both rules to avoid nested tasks as much as I can. And also put as little code as possible in @app.task

order to be able to test task functions as much isolation as possible.



This may sound rather unpleasant and awful, but in reality it is not.

Also you can check how big guys like Sentry do it (spoiler: mocks and some helpful helpers).

So it's definitely possible, it's just not an easy way to find a few best practices.

+1


source


I haven't used celery in a while, and if something hasn't changed, you'll have to call your methods directly to use them as unit tests.

@app.task(bind=True, max_retries=None):
def foo_poll(self, celery_id)
    task =  AsyncResult(celery_id)
    if not task.ready():  # RuntimeError: Cannot retrieve result with task_always_eager enabled
        return self.retry(countdown=5)
    else:
        pass  # Do something with the result

      

For your unit test, you can:



  • patch AsyncResult

    to call the correct branch
  • Create your class
  • Patch retry

    method and assert what it is called
  • Control your method directly

This is, of course, only an exercise in the logic of the method, not celery. I usually install one or two integration (collaboration) tests that tell ALWAYS_EAGER to go through the celery code even though the celery will run in memory out of order.

0


source







All Articles