Python multiprocessing module: merging processes with timeout
im doing the optimization of complex simulation parameters. Im using a multiprocessor module to improve the performance of the optimization algorithm. I learned the basics of multiprocessing at http://pymotw.com/2/multiprocessing/basics.html . Complex simulation lasts at different times depending on the specified parameters from the optimization algorithm, from 1 to 5 minutes. If the parameters are too poorly chosen, the simulation may take 30 minutes or more and the results will not be useful. So I was thinking about creating a timeout for multiprocessing, which ends all simulations that last longer than a certain amount of time. Here's an abstract version of the problem:
import numpy as np
import time
import multiprocessing
def worker(num):
time.sleep(np.random.random()*20)
def main():
pnum = 10
procs = []
for i in range(pnum):
p = multiprocessing.Process(target=worker, args=(i,), name = ('process_' + str(i+1)))
procs.append(p)
p.start()
print 'starting', p.name
for p in procs:
p.join(5)
print 'stopping', p.name
if __name__ == "__main__":
main()
The line p.join(5)
specifies a waiting time of 5 seconds. Because of the looping loop, the for p in procs:
program waits 5 seconds until the first process finishes, and then another 5 seconds until the second process finishes, and so on, but I want the program to finish all processes that lasted more than 5 seconds.Also if none of the processes last more than 5 seconds, the program should not wait for this 5 seconds.
source to share
You can do this by creating a loop that will wait for some timeout in seconds, often checking if all processes have terminated. If they haven't finished within the allotted amount of time, terminate all processes:
TIMEOUT = 5
start = time.time()
while time.time() - start <= TIMEOUT:
if any(p.is_alive() for p in procs):
time.sleep(.1) # Just to avoid hogging the CPU
else:
# All the processes are done, break now.
break
else:
# We only enter this if we didn't 'break' above.
print("timed out, killing all processes")
for p in procs:
p.terminate()
p.join()
source to share
If you want to kill all processes, you can use the pool from multiprocessing, you will need to define a total timeout for the entire execution, not individual timeouts.
import numpy as np
import time
from multiprocessing import Pool
def worker(num):
xtime = np.random.random()*20
time.sleep(xtime)
return xtime
def main():
pnum = 10
pool = Pool()
args = range(pnum)
pool_result = pool.map_async(worker, args)
# wait 5 minutes for every worker to finish
pool_result.wait(timeout=300)
# once the timeout has finished we can try to get the results
if pool_result.ready():
print pool_result.get(timeout=1)
if __name__ == "__main__":
main()
This will give you a list with return values ββfor all of your workers in order.
More info here:
https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.pool
source to share
Thanks to dano's help, I found a solution:
import numpy as np
import time
import multiprocessing
def worker(num):
time.sleep(np.random.random()*20)
def main():
pnum = 10
TIMEOUT = 5
procs = []
bool_list = [True]*pnum
for i in range(pnum):
p = multiprocessing.Process(target=worker, args=(i,), name = ('process_' + str(i+1)))
procs.append(p)
p.start()
print 'starting', p.name
start = time.time()
while time.time() - start <= TIMEOUT:
for i in range(pnum):
bool_list[i] = procs[i].is_alive()
print bool_list
if np.any(bool_list):
time.sleep(.1)
else:
break
else:
print("timed out, killing all processes")
for p in procs:
p.terminate()
for p in procs:
print 'stopping', p.name,'=', p.is_alive()
p.join()
if __name__ == "__main__":
main()
This is not the most elegant way, I am sure there is a better way than using bool_list
. Processes that are still alive after a 5 second timeout will be killed. If you set a shorter time in the run function than the timeout, you will see the program stop before the 5 second timeout is reached. I'm still open to more elegant solutions, if any :)
source to share