Can't execute .get () from multiprocessing. Queue

I am building a web application to handle ~ 60,000 (and growing) large files, doing some analysis and returning a "best guess" that needs to be verified by the user. The files will be categorized so as not to download every file, but I'm still left with a scenario where I might have to process 1000+ files at a time.

These are large files that can take up to 8-9 seconds to process, and in a situation with more than 1000 files, it is impractical for the user to wait 8 seconds between reviews or 2 hours + while files are processed before starting work.

To overcome this, I decided to use multiprocessing to create multiple workers, each of which would pick from a queue of files, process them, and insert them into an output queue. I have another method that basically polls the output queue for items and then feeds them to the client when it becomes available.

This works well until part of the path has passed, when the queue will arbitrarily stop returning items. We are using gevent with Django and uwsgi in our environment, and I know that creating a child process through multiprocessing in the gevent context results in an unwanted state of the event loop in the child. The sockets generated before the fork are duplicated in the child. So I decided to use gipc to help handle child processes.

An example of my code (I cannot post my actual code):

import multiprocessing
import gipc
from item import Item

MAX_WORKERS = 10

class ProcessFiles(object):

    def __init__(self):
        self.input_queue = multiprocessing.Queue()
        self.output_queue = multiprocessing.Queue()
        self.file_count = 0

    def query_for_results(self):
        # Query db for records of files to process.
        # Return results and set self.file_count equal to
        # the number of records returned.
        pass

    # The subprocess.
    def worker(self):
        # Chisel away at the input queue until no items remain.
        while True:
            if self.no_items_remain():
                return

            item = self.input_queue.get(item)
            item.process()
            self.output_queue.put(item)

    def start(self):
        # Get results and store in Queue for processing
        results = self.query_for_results()
        for result in results:
             item = Item(result)
             self.input_queue.put(item)

        # Spawn workers to process files.
        for _ in xrange(MAX_WORKERS):
            process = gipc.start_process(self.worker)

        # Poll for items to send to client.
        return self.get_processed_items()

    def get_processed_items(self):

        # Wait for the output queue to hold at least 1 item.
        # When an item becomes available, yield it to client.
        count = 0
        while count != self.file_count:
            #item = self._get_processed_item()
            # Debugging:
            try:
                item = self.output_queue.get(timeout=1)
            except:
                print '\tError fetching processed item. Retrying...'
                continue

            if item:
                print 'QUEUE COUNT: {}'.format(self.output_queue.qsize())
                count += 1
                yield item
        yield 'end'

      

I expect the result to show the current count of the queue after processing and give an element:

QUEUE COUNT: 999
QUEUE COUNT: 998
QUEUE COUNT: 997
QUEUE COUNT: 996
...
QUEUE COUNT: 4
QUEUE COUNT: 3
QUEUE COUNT: 2
QUEUE COUNT: 1

      

However, the script only manages to execute a few elements before failing:

QUEUE COUNT: 999
QUEUE COUNT: 998
QUEUE COUNT: 997
QUEUE COUNT: 996
    Error fetching processed item. Retrying...
    Error fetching processed item. Retrying...
    Error fetching processed item. Retrying...
    Error fetching processed item. Retrying...
    Error fetching processed item. Retrying...
    Error fetching processed item. Retrying...
    ...

      

My question is, what exactly is going on? Why can't I get get

out of line? How can I return the item that I am expecting and avoid it?

+3


source to share


1 answer


What is the actual exception that is thrown when you cannot receive the item? You will blindly catch all exceptions that might be thrown. Also, why not just use it get

without a timeout? You will immediately try again without doing anything else. Maybe just make a call to get the block until the element is ready.

As for the problem, I think what is happening is that it gipc

closes the pipes associated with your queue and thus splits the queue. I expect to queue.Empty

be selected instead OSError

. See the bug report for details .

Alternatively, you can use a process pool, initiate the pool before anything happens gevent

(which means you don't have to worry about the event loop issue). Submit the jobs to the pool using imap_unordered

and you should be fine.



Your startup function will look something like this:

def start(self):
    results = self.query_for_results()
    return self.pool.imap_unordered(self.worker, results, 
        chunksize=len(results) // self.num_procs_in_pool)

@staticmethod
def worker(item):
    item.process()
    return item

      

0


source







All Articles