Multiprocessing.Pool spawns new childern after terminate () on Linux / Python2.7?

I have an executable file that I need to run very often, with different parameters. To do this, I wrote a small Python (2.7) wrapper using the multiprocessing module following the pattern below.

My code looks like this:

try:
     logging.info("starting pool runs")
     pool.map(run_nlin, params)
     pool.close()
 except KeyboardInterrupt:
     logging.info("^C pressed")
     pool.terminate()
 except Exception, e:
     logging.info("exception caught: ", e)
     pool.terminate()
 finally:
     time.sleep(5)
     pool.join()
     logging.info("done")

      

My working function is located here:

class KeyboardInterruptError(Exception): pass

def run_nlin((path_config, path_log, path_nlin, update_method)):
    try:
        with open(path_log, "w") as log_:
            cmdline = [path_nlin, path_config]
            if update_method:
                cmdline += [update_method, ]
            sp.call(cmdline, stdout=log_, stderr=log_)
    except KeyboardInterrupt:
        time.sleep(5)
        raise KeyboardInterruptError()
    except:
        raise

      

path_config

is the path to the configuration file for the binary program; which contains, for example, the start date of the program for.

When I start the shell everything looks fine. However, when I click ^C

, the wrapper script seems to start additional processes numproc

from the pool before exiting. As an example, when I run the script for days 1-10, I can see in the output ps aux

that two instances of the binary are running (usually for days 1 and 3). Now when I click ^C

, the shell script exits, the binaries for days 1 and 3 are gone, but there are new binaries running for days 5 and 7.

So it seems to me that it Pool

starts another process numproc

before it finally dies.

Any ideas what is going on here and what I can do about it?

+3


source to share


1 answer


In this page, Jesse Knoller, the author of the multiprocessing module, shows that the correct way to handle it KeyboardInterrupt

is to have return subprocesses - not to reraise the exception. This allows the main process to terminate the pool.

However, as the following code shows, the main process does not reach the block except KeyboardInterrupt

until all of the tasks generated have been completed pool.map

. This is why (I suppose) you see additional calls to your worker function run_nlin

, after clicking Ctrl-C

.

A possible workaround is to check all operational features if installed multiprocessing.Event

. If the event was set, ask the worker to release it early, otherwise continue the calculation.


import logging
import multiprocessing as mp
import time

logger = mp.log_to_stderr(logging.WARNING)

def worker(x):
    try:
        if not terminating.is_set():
            logger.warn("Running worker({x!r})".format(x = x))
            time.sleep(3)
        else:
            logger.warn("got the message... we're terminating!")
    except KeyboardInterrupt:
        logger.warn("terminating is set")        
        terminating.set()
    return x

def initializer(terminating_):
    # This places terminating in the global namespace of the worker subprocesses.
    # This allows the worker function to access `terminating` even though it is
    # not passed as an argument to the function.
    global terminating
    terminating = terminating_

def main():
    terminating = mp.Event()    
    result = []
    pool = mp.Pool(initializer=initializer, initargs=(terminating, ))
    params = range(12)
    try:
         logger.warn("starting pool runs")
         result = pool.map(worker, params)
         pool.close()
    except KeyboardInterrupt:
        logger.warn("^C pressed")
        pool.terminate()
    finally:
        pool.join()
        logger.warn('done: {r}'.format(r = result))

if __name__ == '__main__':
    main()

      


Running the script gives:



% test.py
[WARNING/MainProcess] starting pool runs
[WARNING/PoolWorker-1] Running worker(0)
[WARNING/PoolWorker-2] Running worker(1)
[WARNING/PoolWorker-3] Running worker(2)
[WARNING/PoolWorker-4] Running worker(3)

      

The Ctrl key was pressed here; each of the workers sets up an event terminating

. We really only need to install it, but it works despite a little inefficiency.

  C-c C-c[WARNING/PoolWorker-4] terminating is set
[WARNING/PoolWorker-2] terminating is set
[WARNING/PoolWorker-3] terminating is set
[WARNING/PoolWorker-1] terminating is set

      

All other queued tasks are now executed pool.map

:

[WARNING/PoolWorker-4] got the message... we're terminating!
[WARNING/PoolWorker-2] got the message... we're terminating!
[WARNING/PoolWorker-1] got the message... we're terminating!
[WARNING/PoolWorker-2] got the message... we're terminating!
[WARNING/PoolWorker-4] got the message... we're terminating!
[WARNING/PoolWorker-2] got the message... we're terminating!
[WARNING/PoolWorker-1] got the message... we're terminating!
[WARNING/PoolWorker-3] got the message... we're terminating!

      

Finally, the main process reaches the block except KeyboardInterrupt

.

[WARNING/MainProcess] ^C pressed
[WARNING/MainProcess] done: []

      

+12


source







All Articles