Execute process list without multiprocessing pool map
import multiprocessing as mp
if __name__ == '__main__':
#pool = mp.Pool(M)
p1 = mp.Process(target= target1, args= (arg1,))
p2 = mp.Process(target= target2, args= (arg1,))
...
p9 = mp.Process(target= target9, args= (arg9,))
p10 = mp.Process(target= target10, args= (arg10,))
...
pN = mp.Process(target= targetN, args= (argN,))
processList = [p1, p2, .... , p9, p10, ... ,pN]
I have N different target functions that consume an unequal non-trivial amount of time to execute.
I'm looking for a way to execute them in parallel so that M (1 <M <N) processes run concurrently. And once the process finishes, the next process should start from the list until all processes in processList
are completed.
Since I am not calling the same target function, I could not use Pool
.
I decided to do something like this:
for i in range(0, N, M):
limit = i + M
if(limit > N):
limit = N
for p in processList[i:limit]:
p.join()
Since my target functions consume unequal time to execute, this method is inefficient.
Any suggestions? Thanks in advance.
EDIT: The title of the question has been changed to "Run a process list without a multiprocessor pool map" from "Run a list of processes without a multiprocessor pool".
source to share
Here's a way to do it in Python 3.4, which can be adapted for Python 2.7:
targets_with_args = [
(target1, arg1),
(target2, arg2),
(target3, arg3),
...
]
with concurrent.futures.ProcessPoolExecutor(max_workers=20) as executor:
futures = [executor.submit(target, arg) for target, arg in targets_with_args]
results = [future.result() for future in concurrent.futures.as_completed(futures)]
source to share
You can use application pool :
#!/usr/bin/env python
# coding=utf-8
from multiprocessing import Pool
import random
import time
def target_1():
time.sleep(random.uniform(0.5, 2))
print('done target 1')
def target_2():
time.sleep(random.uniform(0.5, 2))
print('done target 1')
def target_3():
time.sleep(random.uniform(0.5, 2))
print('done target 1')
def target_4():
time.sleep(random.uniform(0.5, 2))
print('done target 1')
pool = Pool(2) # maximum two processes at time.
pool.apply_async(target_1)
pool.apply_async(target_2)
pool.apply_async(target_3)
pool.apply_async(target_4)
pool.close()
pool.join()
The pool is created specifically for what you need to do - to perform many tasks in a limited number of processes.
I also suggest you take a look at the concurrent.futures
library and backport to Python 2.7 . It has ProcessPoolExecutor
, which has about the same capabilities, but methods return Future
, and they have a more convenient API.
source to share
I would use Queue
. adding processes to it from processList
, and once the process finishes, I will remove it from the queue and add another one.
the pseudo code will look like this:
from Queue import Queue
q = Queue(m)
# add first process to queue
i = 0
q.put(processList[i])
processList[i].start()
i+=1
while not q.empty():
p=q.get()
# check if process is finish. if not return it to the queue for later checking
if p.is_alive():
p.put(t)
# add another process if there is space and there are more processes to add
if not q.full() and i < len(processList):
q.put(processList[i])
processList[i].start()
i+=1
source to share
A simple solution would be to wrap the target functions {1,2, ... N} in one function forward_to_target, which will jump to the corresponding target function {1,2, ... N} according to the argument, which If you cannot output the appropriate target function from the arguments you are using, replace each argument with a tuple (argX, X), then in the forward_to_target function, unpack the tuple and go to the corresponding function denoted by X.
source to share
You can have two list
targets and arguments, zip
two together - and send them to the runner function (here it is run_target_on_args
):
#!/usr/bin/env python
import multiprocessing as mp
# target functions
targets = [len, str, len, zip]
# arguments for each function
args = [["arg1"], ["arg2"], ["arg3"], [["arg5"], ["arg6"]]]
# applies target function on it arguments
def run_target_on_args(target_args):
return target_args[0](*target_args[1])
pool = mp.Pool()
print pool.map(run_target_on_args, zip(targets, args))
source to share