In Python, how can I wait for threads on multiple queues?
In the following code, I have two queues to run different kinds of threads. These threads add queues to each other recursively (queue 1 grabs some information, queue 2 processes it and adds more to queue 1).
The problem is that when the first queue temporarily runs out of material, it closes, so it never sees that queue 2 (out_queue) is added after it.
I added to the time.sleep () function, which is a very hacky fix, for 30 seconds both queues were full enough not to run out.
What is the standard Python way of doing this? Should I only have one queue and tags in it, which threads should they be processed with?
queue = Queue.Queue()
out_queue = Queue.Queue()
class ThreadUrl(threading.Thread):
"""Threaded Url Grab"""
def __init__(self, queue, out_queue):
threading.Thread.__init__(self)
self.queue = queue
self.out_queue = out_queue
def run(self):
while True:
row = self.queue.get()
request = urllib2.Request(row[0], None, req_headers)
# ... some processing ...
self.out_queue.put([row, http_status, page])
self.queue.task_done()
class DatamineThread(threading.Thread):
def __init__(self, out_queue, mysql):
threading.Thread.__init__(self)
self.out_queue = out_queue
self.mysql = mysql
def run(self):
while True:
row = self.out_queue.get()
# ... some processing ...
queue.put(newrow)
self.out_queue.task_done()
queue = Queue.Queue()
out_queue = Queue.Queue()
for i in range(URL_THREAD_COUNT):
t = ThreadUrl(queue, out_queue)
t.setDaemon(True)
t.start()
#populate queue with data
for row in rows:
queue.put(row)
#MySQL Connector
mysql = MySQLConn(host='localhost', user='root', passwd = None, db='db')
#spawn DatamineThread, if you have multiple, make sure each one has it own mysql connector
dt = DatamineThread(out_queue, mysql)
dt.setDaemon(True)
dt.start()
time.sleep(30)
#wait on the queue until everything has been processed
queue.join()
out_queue.join()
source to share
Change workers so that they need a checkpoint to exit instead of exiting when they no longer have jobs in the queue. In the following code, a worker howdy
reads items from the input queue. If the value is sentinel ( None
but it could be anything), the worker will exit.
As a consequence, you don't have to fiddle with timeouts, which you have found can be quite dodgy. Another implication is that if you have N threads, you must add N guards to the input queue to kill your workers. Otherwise, you will run into a worker who will wait forever. A zombie worker if you like.
Source
import threading, Queue
def howdy(q):
for msg in iter(q.get, None):
print 'howdy,',msg
inq = Queue.Queue()
for word in 'whiskey syrup bitters'.split():
inq.put(word)
inq.put( None ) # tell worker to exit
thread = threading.Thread(target=howdy, args=[inq])
thread.start()
thread.join()
Output
howdy, whiskey howdy, syrup howdy, bitters
source to share