In Python, how can I wait for threads on multiple queues?
In the following code, I have two queues to run different kinds of threads. These threads add queues to each other recursively (queue 1 grabs some information, queue 2 processes it and adds more to queue 1).
The problem is that when the first queue temporarily runs out of material, it closes, so it never sees that queue 2 (out_queue) is added after it.
I added to the time.sleep () function, which is a very hacky fix, for 30 seconds both queues were full enough not to run out.
What is the standard Python way of doing this? Should I only have one queue and tags in it, which threads should they be processed with?
queue = Queue.Queue() out_queue = Queue.Queue() class ThreadUrl(threading.Thread): """Threaded Url Grab""" def __init__(self, queue, out_queue): threading.Thread.__init__(self) self.queue = queue self.out_queue = out_queue def run(self): while True: row = self.queue.get() request = urllib2.Request(row, None, req_headers) # ... some processing ... self.out_queue.put([row, http_status, page]) self.queue.task_done() class DatamineThread(threading.Thread): def __init__(self, out_queue, mysql): threading.Thread.__init__(self) self.out_queue = out_queue self.mysql = mysql def run(self): while True: row = self.out_queue.get() # ... some processing ... queue.put(newrow) self.out_queue.task_done() queue = Queue.Queue() out_queue = Queue.Queue() for i in range(URL_THREAD_COUNT): t = ThreadUrl(queue, out_queue) t.setDaemon(True) t.start() #populate queue with data for row in rows: queue.put(row) #MySQL Connector mysql = MySQLConn(host='localhost', user='root', passwd = None, db='db') #spawn DatamineThread, if you have multiple, make sure each one has it own mysql connector dt = DatamineThread(out_queue, mysql) dt.setDaemon(True) dt.start() time.sleep(30) #wait on the queue until everything has been processed queue.join() out_queue.join()
source to share
Change workers so that they need a checkpoint to exit instead of exiting when they no longer have jobs in the queue. In the following code, a worker
reads items from the input queue. If the value is sentinel (
but it could be anything), the worker will exit.
As a consequence, you don't have to fiddle with timeouts, which you have found can be quite dodgy. Another implication is that if you have N threads, you must add N guards to the input queue to kill your workers. Otherwise, you will run into a worker who will wait forever. A zombie worker if you like.
import threading, Queue def howdy(q): for msg in iter(q.get, None): print 'howdy,',msg inq = Queue.Queue() for word in 'whiskey syrup bitters'.split(): inq.put(word) inq.put( None ) # tell worker to exit thread = threading.Thread(target=howdy, args=[inq]) thread.start() thread.join()
howdy, whiskey howdy, syrup howdy, bitters
source to share