Python: multiprocessing jobs not filling up
I am using python 2.7 with multiprocessing :: Pool to execute a job in parallel
I've simplified the example below, but here's the gist of it.
It will create a file for each person in my dict using the function apply_async()
. However, when I check if the file was created correctly, I noticed that sometimes the file was not created.
Now I think I did something wrong in the way I used multiprocessing :: Pool
Any advice?
import os
from multiprocessing import Pool
def outputFile(person):
ofh=open(person+'.txt','w')
ofh.write('test\n')
ofh.close()
pool = Pool(processes=4)
for person in person_dict:
pool.apply_async(outputFile,args(person))
pool.close()
pool.join()
for person in person_dict:
print os.path.isfile(person+'.txt')
True
True
False
True
source to share
Could this be related to the content of person_dict?
I modified your code and ran it several times. They all gave the expected results.
Here is the code I modified and tested:
import os
from multiprocessing import Pool
def outputfile(person):
with open(person+'.txt','w') as ofh:
ofh.write('test\n')
person_dict = {'a': 'a', 'b': 'b', 'c':'c', 'd':'d'}
pool = Pool(processes=4)
for person in person_dict:
pool.apply_async(outputfile, (person))
pool.close()
pool.join()
for person in person_dict:
print(os.path.isfile(person+'.txt'))
source to share
If you don't catch the exceptions in the subprocesses and print them yourself, you won't see them. The following program produces no output:
import os
from multiprocessing import Pool
def outputFile(person):
raise Exception("An exception")
pool = Pool(processes=4)
for person in range(100):
pool.apply_async(outputFile, args=(person,))
pool.close()
pool.join()
You need to catch all exceptions and manually print the traceback:
import os
from multiprocessing import Pool, Lock
import traceback
print_lock = Lock()
def outputFile(person):
try:
raise Exception("An exception")
except:
with print_lock:
print "%s: An exception occurred" % person
print traceback.format_exc()
pool = Pool(processes=4)
for person in range(100):
args = (person, print_lock)
pool.apply_async(outputFile, args=(person,))
pool.close()
pool.join()
Output
0: An exception occurred
Traceback (most recent call last):
File "person.py", line 9, in outputFile
raise Exception("An exception")
Exception: An exception
1: An exception occurred
Traceback (most recent call last):
File "person.py", line 9, in outputFile
raise Exception("An exception")
Exception: An exception
...
99: An exception occurred
Traceback (most recent call last):
File "person.py", line 9, in outputFile
raise Exception("An exception")
Exception: An exception
Note. print_lock
used to prevent the output from alternating.
source to share