Is there a way for workers in multiprocessing .Pool apply_async to catch errors and continue?
When using multiprocessing.Pool, apply_async()
what happens to the code break? This only includes exceptions, I think, but there may be other things that cause worker functions to fail.
import multiprocessing as mp
pool = mp.Pool(mp.cpu_count())
for f in files:
pool.apply_async(workerfunct, args=(*args), callback=callbackfunct)
As I understand it now, the process / worker is down (all other processes continue) and whatever passed after the thrown error is not executed, EVEN if I catch the error with try / except.
As an example, generally I would like to exclude errors and supply a default and / or print the error message and the code will continue. If my callback function includes writing to a file, this is done with the default values.
This responder wrote a little about this :
I suspect that the reason you don't see anything happening with your example code is because all calls to your worker function fail. If the worker function fails, the callback will never be executed. A failure is not reported at all unless you try to fetch the result from the AsyncResult object returned by the apply_async call. However, since you are not saving any of these objects, you will never know that a crash has occurred. If I were you, I would try to use pool.apply while testing so that you see errors as soon as they appear.
source to share
If you are using Python 3.2+, you can use error_callback
the keyword argument to handle exceptions thrown by workers.
pool.apply_async(workerfunct, args=(*args), callback=callbackfunct, error_callback=handle_error)
handle_error
will be thrown with the exception object as an argument.
If you don't, you must wrap all your work functions in try
/ except
to ensure execution callback
. (I think you got the impression that this won't work from my answer in this other question, but it doesn't. Sorry!):
def workerfunct(*args):
try:
# Stuff
except Exception as e:
# Do something here, maybe return e?
pool.apply_async(workerfunct, args=(*args), callback=callbackfunct)
You can also use a wrapper function if you can't / don't want to change the function you actually want to call:
def wrapper(func, *args):
try:
return func(*args)
except Exception as e:
return e
pool.apply_async(wrapper, args=(workerfunct, *args), callback=callbackfunct)
source to share