Why is this little snippet hanging using multiprocessing with maxtasksperchild, numpy.random.randint and numpy.random.seed?

I have a python script that processes numpy arrays and images at the same time in a random way. To have the correct randomness inside the spawned processes, I pass the random seed from the main process to the workers to be seeded.

When I use maxtasksperchild

for Pool

, my script hangs after being run Pool.map

several times.

Below is a minimal snippet that reproduces the problem:

# This code stops after multiprocessing.Pool workers are replaced one single time.
# They are replaced due to maxtasksperchild parameter to Pool
from multiprocessing import Pool
import numpy as np

def worker(n):
    # Removing np.random.seed solves the issue
    np.random.seed(1) #any seed value
    return 1234 # trivial return value

# Removing maxtasksperchild solves the issue
ppool = Pool(20 , maxtasksperchild=5)
i=0
while True:
    i += 1
    # Removing np.random.randint(10) or taking it out of the loop solves the issue
    rand = np.random.randint(10)
    l  = [3] # trivial input to ppool.map
    result = ppool.map(worker, l)
    print i,result[0]

      

This is the conclusion

1 1234
2 1234
3 1234
...
...
...
99 1234
100 1234 # at this point workers should've reached maxtasksperchild tasks
101 1234
1021234
103 1234
104 1234
105 1234
106 1234
1071234
108 1234
109 1234
110 1234

then freezes endlessly.

I could replace numpy.random

with python random

and get away with the problem. However, in my actual application, the worker will execute user code (given as an argument to worker) that I have no control over, and would like to allow the use of functions numpy.random

in that user code. So I intentionally want to seed the global random generator (for each process independently).

This has been tested with Python 2.7.10, numpy 1.11.0, 1.12.0 and 1.13.0, Ubuntu and OSX

+3


source to share


3 answers


It turns out that this comes from a buggy interaction between Python threading.Lock

and multiprocessing

.

np.random.seed

and for most functions it is np.random.*

used threading.Lock

for thread safety. The function np.random.*

generates a random number and then updates the seed (shared by threads), so a lock is required. See np.random.seed and cont0_array (used by np.random.random()

and others).

Now, how can this cause the problem in the above snippet?



In a nutshell, the fragment hangs because the state is threading.Lock

inherited when forcing. So when a child forks at the same time, the lock is acquired from the parent (through np.random.randint(10)

), the child locks (in np.random.seed

).

@njsmith explains it in this github release https://github.com/numpy/numpy/issues/9248#issuecomment-308054786

multiprocessing.Pool spawns a background thread to manage workers: https://github.com/python/cpython/blob/aefa7ebf0ff0f73feee7ab24f4cdcb2014d83ee5/Lib/multiprocessing/pool.py#L170-L173

It loops against the background of the _maintain_pool call: https://github.com/python/cpython/blob/aefa7ebf0ff0f73feee7ab24f4cdcb2014d83ee5/Lib/multiprocessing/pool.py#L366

If a worker exits, for example, due to a maxtasksperchild limitation, then _maintain_pool calls _repopulate_pool: https://github.com/python/cpython/blob/aefa7ebf0ff0f73feee7ab24f4cdcb2014d83ee5/Lib/multi#Lcessing/40pool.python

And then _repopulate_pool deploys new workers, still in this background thread: https://github.com/python/cpython/blob/aefa7ebf0ff0f73feee7ab24f4cdcb2014d83ee5/Lib/multiprocessing/pool.py#L224

So what happens is you end up out of luck, and at the same moment when your main thread calls some np.random function and holds the lock, multiprocessing decides to develop a child that starts with a lock np.random already held, but the thread that held him disappeared. The child then tries to call np.random, which requires a commit, and thus the child locks.

A simple workaround here is not to use a multiprocessing fork. If you are using spawn or forkserver startup methods this should go away.

For a correct fix .... ughhh. I guess we need to register a pre-fork pthread_atfork handler that blocks the np.random lock before fork and then releases it afterwards? And indeed, I think we need to do this for every lock in numpy, which requires something like keeping the weakness of each RandomState object, and _FFTCache also has a lock ...

(On the other hand, this will also give us the ability to re-initialize the global random state in the child, which we should really do in cases where the user hasn't explicitly seeded it.)

+3


source


Usage is numpy.random.seed

not thread safe. numpy.random.seed

changes the meaning of the seed globally, and as far as I understand you are trying to change the seed locally.

See docs



If indeed what you are trying to achieve is generating a generator at the start of each worker, the following solution is:

def worker(n):
    # Removing np.random.seed solves the problem                                                               
    randgen = np.random.RandomState(45678) # RandomState, not seed!
    # ...Do something with randgen...                                           
    return 1234 # trivial return value                                                                         

      

0


source


Make this a complete answer as it doesn't match the comments.

After playing around a bit, something here smells like a numpy.random error. I was able to reproduce the freeze error, and on top of that, there were some other strange things that shouldn't have happened, like manual seeding of the generator doesn't work.

def rand_seed(rand, i):
    print(i)
    np.random.seed(i)
    print(i)
    print(rand())
def test1():
    with multiprocessing.Pool() as pool:
        [pool.apply_async(rand_seed, (np.random.random_sample, i)).get()
        for i in range(5)]
test1()

      

has an outlet

0
0
0.3205032737431185
1
1
0.3205032737431185
2
2
0.3205032737431185
3
3
0.3205032737431185
4
4
0.3205032737431185

      

On the other hand, not passing np.random.random_sample as an argument works just fine.

def rand_seed2(i):
    print(i)
    np.random.seed(i)
    print(i)
    print(np.random.random_sample())
def test2():
    with multiprocessing.Pool() as pool:
        [pool.apply_async(rand_seed, (i,)).get()
        for i in range(5)]
test2()

      

has an outlet

0
0
0.5488135039273248
1
1
0.417022004702574
2
2
0.43599490214200376
3
3
0.5507979025745755
4
4
0.9670298390136767

      

This suggests that there is serious tomfoolery going on behind the curtains. Not sure what else to say about this though ....

It's basically like numpy.random.seed changes not only the "seed state" variable, but also the function random_sample

.

0


source







All Articles