Stopping multiprocessing creates new processes when a task finishes

Hello I have a multiprocessing program like

#this is pseudocode-ish

def worker(queue, context):
    set_context(context) #set global context within worker
    while queue.qsize() > 0:
        process(queue.get(False))

pool = multiprocessing.Pool(20, worker, (queue, global_context))
pool.close()
pool.join()

      

The problem is that it global context

is a very heavy object, so it takes some time for each individual process to spawn (pickling / crumbling). So what I found was that, for shorter queues, the entire queue is processed by the first pair of spawned processes, and then the rest of the program gets stuck in the spawning of the rest of the processes, which inevitably means nothing because there is nothing left in the queue. For example. each process takes 1 second to appear, but the queue is processed after 2 seconds - so the first two processes complete the queue in 2-3 seconds, and then the rest of the program takes 17 seconds to call the remaining queue.

Is there a way to kill the rest of the processes when the queue is empty? Or a more flexible way to tune the number of process pools - for example, just start another process when needed?

thank

+3


source to share


1 answer


It is not possible to start a process on the fly with multiprocessing.Pool

. You will need to change it yourself if you want this type of behavior.

One way to complete the job is to use the method multiprocessing.Pool.terminate

. But it is probably waiting for everyone to worker

finish their initialization.

You can also kill all workers at once when your job is complete. I think these are fields _pool

that contain the whole work Process

that you can force terminate. Note that this can lead to some strange behavior as it is not intended to be processed from the outside. You have to make sure that you are clearing all the controls correctly thread

, which can be tricky.

Your design choices are quite unusual. You are duplicating call_queue

. Indeed, a Pool

has to take care of this on its own, and you don't need an additional one queue

. If all taks are in task_list

and need to be processed with process_task

, you can do something like



#this is pseudocode-ish

def init(queue, context):
    set_context(context) # set global context within worker

pool = multiprocessing.Pool(20, init, (global_context,))
res = pool.map(process_task, task_list)
pool.terminate()
pool.join()

      

This way you won't break the setup Pool

and are probably more efficient.

Finally, if you intend to reuse your pool multiple times and your global_context does not change, you can use loky

. (DISCLAIMER: I am one of the developers of this project). This allows you to reuse the worker pool multiple times in a program without reinstalling everything. One problem is that it doesn't initializer

, as it conforms to the API concurrent.futures

, but initializer

can be done using multiprocessing.Barrier

and dispatching init jobs max_workers

. This ensures that each job for initializer

a single worker starts and that all workers start initializer

.

+6


source







All Articles