Is there a way for workers in multiprocessing .Pool apply_async to catch errors and continue?

When using multiprocessing.Pool, apply_async()

what happens to the code break? This only includes exceptions, I think, but there may be other things that cause worker functions to fail.

import multiprocessing as mp
pool = mp.Pool(mp.cpu_count())
for f in files:
    pool.apply_async(workerfunct, args=(*args), callback=callbackfunct) 

      

As I understand it now, the process / worker is down (all other processes continue) and whatever passed after the thrown error is not executed, EVEN if I catch the error with try / except.

As an example, generally I would like to exclude errors and supply a default and / or print the error message and the code will continue. If my callback function includes writing to a file, this is done with the default values.

This responder wrote a little about this :

I suspect that the reason you don't see anything happening with your example code is because all calls to your worker function fail. If the worker function fails, the callback will never be executed. A failure is not reported at all unless you try to fetch the result from the AsyncResult object returned by the apply_async call. However, since you are not saving any of these objects, you will never know that a crash has occurred. If I were you, I would try to use pool.apply while testing so that you see errors as soon as they appear.

+3


source to share


1 answer


If you are using Python 3.2+, you can use error_callback

the keyword argument to handle exceptions thrown by workers.

pool.apply_async(workerfunct, args=(*args), callback=callbackfunct, error_callback=handle_error) 

      

handle_error

will be thrown with the exception object as an argument.

If you don't, you must wrap all your work functions in try

/ except

to ensure execution callback

. (I think you got the impression that this won't work from my answer in this other question, but it doesn't. Sorry!):



def workerfunct(*args):
    try:
        # Stuff
    except Exception as e:
        # Do something here, maybe return e?

pool.apply_async(workerfunct, args=(*args), callback=callbackfunct) 

      

You can also use a wrapper function if you can't / don't want to change the function you actually want to call:

def wrapper(func, *args):
    try:
        return func(*args)
    except Exception as e:
        return e

pool.apply_async(wrapper, args=(workerfunct, *args), callback=callbackfunct) 

      

+6


source







All Articles