Execute process list without multiprocessing pool map

import multiprocessing as mp

if __name__ == '__main__':

    #pool = mp.Pool(M)

    p1 = mp.Process(target= target1, args= (arg1,))
    p2 = mp.Process(target= target2, args= (arg1,))
    ...
    p9 = mp.Process(target= target9, args= (arg9,))
    p10 = mp.Process(target= target10, args= (arg10,))
    ...
    pN = mp.Process(target= targetN, args= (argN,))

    processList = [p1, p2, .... , p9, p10, ... ,pN]

      

I have N different target functions that consume an unequal non-trivial amount of time to execute.

I'm looking for a way to execute them in parallel so that M (1 <M <N) processes run concurrently. And once the process finishes, the next process should start from the list until all processes in processList

are completed.

Since I am not calling the same target function, I could not use Pool

.

I decided to do something like this:

    for i in range(0, N, M):
        limit = i + M
        if(limit > N):
            limit = N
        for p in processList[i:limit]:
            p.join()

      

Since my target functions consume unequal time to execute, this method is inefficient.

Any suggestions? Thanks in advance.

EDIT: The title of the question has been changed to "Run a process list without a multiprocessor pool map" from "Run a list of processes without a multiprocessor pool".

+3


source to share


5 answers


Here's a way to do it in Python 3.4, which can be adapted for Python 2.7:



targets_with_args = [
    (target1, arg1),
    (target2, arg2),
    (target3, arg3),
    ...
]

with concurrent.futures.ProcessPoolExecutor(max_workers=20) as executor:
    futures = [executor.submit(target, arg) for target, arg in targets_with_args]
    results = [future.result() for future in concurrent.futures.as_completed(futures)]

      

+3


source


You can use application pool :

#!/usr/bin/env python
# coding=utf-8

from multiprocessing import Pool
import random
import time


def target_1():
    time.sleep(random.uniform(0.5, 2))
    print('done target 1')

def target_2():
    time.sleep(random.uniform(0.5, 2))
    print('done target 1')

def target_3():
    time.sleep(random.uniform(0.5, 2))
    print('done target 1')

def target_4():
    time.sleep(random.uniform(0.5, 2))
    print('done target 1')


pool = Pool(2) # maximum two processes at time.
pool.apply_async(target_1)
pool.apply_async(target_2)
pool.apply_async(target_3)
pool.apply_async(target_4)
pool.close()
pool.join()

      



The pool is created specifically for what you need to do - to perform many tasks in a limited number of processes.

I also suggest you take a look at the concurrent.futures

library and backport to Python 2.7 . It has ProcessPoolExecutor

, which has about the same capabilities, but methods return Future

, and they have a more convenient API.

+4


source


I would use Queue

. adding processes to it from processList

, and once the process finishes, I will remove it from the queue and add another one.

the pseudo code will look like this:

from Queue import Queue
q = Queue(m)

# add first process to queue
i = 0
q.put(processList[i])
processList[i].start()
i+=1

while not q.empty():
    p=q.get()

    # check if process is finish. if not return it to the queue for later checking
    if p.is_alive():
        p.put(t)

    # add another process if there is space and there are more processes to add
    if not q.full() and i < len(processList):
        q.put(processList[i])
        processList[i].start()
        i+=1

      

+1


source


A simple solution would be to wrap the target functions {1,2, ... N} in one function forward_to_target, which will jump to the corresponding target function {1,2, ... N} according to the argument, which If you cannot output the appropriate target function from the arguments you are using, replace each argument with a tuple (argX, X), then in the forward_to_target function, unpack the tuple and go to the corresponding function denoted by X.

0


source


You can have two list

targets and arguments, zip

two together - and send them to the runner function (here it is run_target_on_args

):

#!/usr/bin/env python

import multiprocessing as mp

# target functions
targets = [len, str, len, zip]

# arguments for each function
args = [["arg1"], ["arg2"], ["arg3"], [["arg5"], ["arg6"]]]

# applies target function on it arguments
def run_target_on_args(target_args):
    return target_args[0](*target_args[1])

pool = mp.Pool()
print pool.map(run_target_on_args, zip(targets, args))

      

0


source







All Articles