Multiprocessing has clipping at 992 integers concatenated as a result

I am following this book http://doughellmann.com/pages/python-standard-library-by-example.html

Along with some online links. I have some algorithm setup for multiprocessing where I have a large set of dictionaries and some calculations. I am using multiprocessing to split up the indices by which dictionary calculations are performed. To make the question more general, I replaced the algorithm with only a few return values. From searching the internet and other SOs, I think it has to do with the join method.

The structure is the same

Create some bogus data, call the manager function for multiprocessing, create a queue, split the data by the number of indices. Number the number of processes in use, give each process the correct index area. Finally, join the processes and print the results.

What I figured out is if the function used by the processes is trying to return the range (0.992), it works fast, if the range is (0.993), it hangs. I tried on two different computers with different specifications.

The code is here:

import multiprocessing

def main():
    data = []
    for i in range(0,10):
        data.append(i)

    CalcManager(data,start=0,end=50)

def CalcManager(myData,start,end):
    print 'in calc manager'
    #Multi processing
    #Set the number of processes to use.  
    nprocs = 3
    #Initialize the multiprocessing queue so we can get the values returned to us
    tasks = multiprocessing.JoinableQueue()
    result_q = multiprocessing.Queue()
    #Setup an empty array to store our processes
    procs = []
    #Divide up the data for the set number of processes 
    interval = (end-start)/nprocs 
    new_start = start
    #Create all the processes while dividing the work appropriately
    for i in range(nprocs):
        print 'starting processes'
        new_end = new_start + interval
        #Make sure we dont go past the size of the data 
        if new_end > end:
            new_end = end 
        #Generate a new process and pass it the arguments 
        data = myData[new_start:new_end]
        #Create the processes and pass the data and the result queue
        p = multiprocessing.Process(target=multiProcess,args=(data,new_start,new_end,result_q,i))
        procs.append(p)
        p.start()
        #Increment our next start to the current end 
        new_start = new_end+1
    print 'finished starting'    
    #Joint the process to wait for all data/process to be finished
    for p in procs:
        p.join()

    #Print out the results
    for i in range(nprocs):
        result = result_q.get()
        print result

#MultiProcess Handling
def multiProcess(data,start,end,result_q,proc_num):
    print 'started process'
    results = range(0,(992))
    result_q.put(results)
    return

if __name__== '__main__':   
    main()

      

Is there something specifically about these numbers, or am I just missing something basic that has nothing to do with these numbers?

From my searches it seems like it is a memory issue with the join method, but the book doesn't really explain how to solve this problem with this setup. Is it possible to use this structure (I understand this mostly, so it would be nice if I could keep using that) and also convey great results. I know there are other ways to exchange data between processes, but this is not what I want, just return the values ​​and join them to the same array after completion.

0


source to share


1 answer


I cannot reproduce this on my machine, but it looks like the items in put

the queue were not flushed to the underlying pipe. This will lead to a dead end if you try to end the process, according to the docs :

As mentioned above, if the child process has queued items (and it has not used JoinableQueue.cancel_join_thread), then this process will not terminate until all buffered items have been flushed out before the pipe. This means that if you try to join this process, you can get it if you are not sure that all the items that were placed on the queue have been consumed. Likewise, if the child process is non-daemonic, then the parent process may hang on exit when it tries to connect all of its non-daemonic children.

If you are in this situation. your calls p.join()

will hang forever because the queue is still buffering data. You can avoid this by consuming from the queue before attaching to processes:



#Print out the results
for i in range(nprocs):
    result = result_q.get()
    print result

#Joint the process to wait for all data/process to be finished
for p in procs:
    p.join()

      

This does not affect the way the code works, each call is result_q.get()

blocked until the result is placed in the queue, which has the same effect as the call join

for all processes before the call get

. The only difference is that you avoid the dead end.

+3


source







All Articles