Why does connection () fail for large datasets?

I process some data that I send to a queue that is consumed by workers, who in turn put their results in another queue, which is finally parsed to get the final result. The example below queues a self.size

number once 1

and workers return a value 10

:

import multiprocessing
import Queue

class Dispatch():
    def __init__(self):
        self.q_in = multiprocessing.Queue()
        self.q_out = multiprocessing.Queue()
        self.maxproc = 4
        self.size = 1000

    def putdata(self):
        # put data to the queue
        for d in range(self.size):
            self.q_in.put(1)
        # put sentinels at the end of the FIFO queue
        for i in range(self.maxproc):
            self.q_in.put("STOP")

    def processing(self):
        procs = []
        for i in range(self.maxproc):
            p = multiprocessing.Process(target=self.multiply)
            procs.append(p)
            p.start()
        return procs

    def multiply(self):
        while True:
            item = self.q_in.get()
            if item == 'STOP':
                print("{0} exiting".format(multiprocessing.current_process()))
                return
            self.q_out.put(item * 10)

    def getdata(self):
        total = 0
        while True:
            try:
                item = self.q_out.get(block=False)
                total += item
            except Queue.Empty:
                print("done getdata")
                break
        print("data out: {0}".format(total))

if __name__=="__main__":
    dis = Dispatch()
    procs = dis.processing()
    dis.putdata()
    for p in procs:
        p.join()
    dis.getdata()
    print("done __main__")

      

This code breaks when self.size

it gets bigger. For example, it works for self.size = 1000

:

<Process(Process-1, started)> exiting
<Process(Process-4, started)> exiting
<Process(Process-2, started)> exiting
<Process(Process-3, started)> exiting
done getdata
data out: 10000
done __main__

      

However, for self.size = 10000

it outputs

<Process(Process-2, started)> exiting
<Process(Process-4, started)> exiting
<Process(Process-1, started)> exiting
<Process(Process-3, started)> exiting

      

and freezes. It looks like it hangs around the calls join()

(since processes return

ed but getdata()

not reached)

Why is that? Is there a limit on the queue size? (I monitored memory and cpu during the run, but they are ok) Or is it something else?

+3


source to share





All Articles