In Python, how can I wait for threads on multiple queues?

In the following code, I have two queues to run different kinds of threads. These threads add queues to each other recursively (queue 1 grabs some information, queue 2 processes it and adds more to queue 1).

The problem is that when the first queue temporarily runs out of material, it closes, so it never sees that queue 2 (out_queue) is added after it.

I added to the time.sleep () function, which is a very hacky fix, for 30 seconds both queues were full enough not to run out.

What is the standard Python way of doing this? Should I only have one queue and tags in it, which threads should they be processed with?

queue = Queue.Queue()
out_queue = Queue.Queue()

class ThreadUrl(threading.Thread):
    """Threaded Url Grab"""
    def __init__(self, queue, out_queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.out_queue = out_queue

    def run(self):
        while True:
            row = self.queue.get()

            request = urllib2.Request(row[0], None, req_headers)

            # ... some processing ...

            self.out_queue.put([row, http_status, page])

            self.queue.task_done()

class DatamineThread(threading.Thread):
    def __init__(self, out_queue, mysql):
        threading.Thread.__init__(self)
        self.out_queue = out_queue
        self.mysql = mysql

    def run(self):
        while True:
            row = self.out_queue.get()

            # ... some processing ...

            queue.put(newrow)

            self.out_queue.task_done()

queue = Queue.Queue()
out_queue = Queue.Queue()

for i in range(URL_THREAD_COUNT):
        t = ThreadUrl(queue, out_queue)
        t.setDaemon(True)
        t.start()

#populate queue with data
for row in rows:
    queue.put(row)

#MySQL Connector
mysql = MySQLConn(host='localhost', user='root', passwd = None, db='db')

#spawn DatamineThread, if you have multiple, make sure each one has it own mysql connector
dt = DatamineThread(out_queue, mysql)
dt.setDaemon(True)
dt.start()

time.sleep(30)

#wait on the queue until everything has been processed
queue.join()
out_queue.join()

      

+3


source to share


1 answer


Change workers so that they need a checkpoint to exit instead of exiting when they no longer have jobs in the queue. In the following code, a worker howdy

reads items from the input queue. If the value is sentinel ( None

but it could be anything), the worker will exit.

As a consequence, you don't have to fiddle with timeouts, which you have found can be quite dodgy. Another implication is that if you have N threads, you must add N guards to the input queue to kill your workers. Otherwise, you will run into a worker who will wait forever. A zombie worker if you like.

Source



import threading, Queue

def howdy(q):
    for msg in iter(q.get, None):
        print 'howdy,',msg

inq = Queue.Queue()
for word in 'whiskey syrup bitters'.split():
    inq.put(word)
inq.put( None )        # tell worker to exit

thread = threading.Thread(target=howdy, args=[inq])
thread.start()
thread.join()

      

Output

howdy, whiskey
howdy, syrup
howdy, bitters

      

+1


source







All Articles