How do I close threads in Python?
I have a problem with too many threads. I think the queue.join () command is just closing the queue and not using its threads.
In my script, I need to check 280k domains and, for each domain, get a list of its MX records and get the IPv6 address of the servers, if any.
I used streams and thanked them to script it many times faster. But there is a problem, although there is a join () in the queue, the number of live threads grows until an error appears that says that can not create any new thread (OS limitation?).
How can I terminate / close / stop / reset threads after every For loop when I return a new domain from the database?
Determining the thread class ...
class MX_getAAAA_thread(threading.Thread):
def __init__(self,queue,id_domain):
threading.Thread.__init__(self)
self.queue = queue
self.id_domain = id_domain
def run(self):
while True:
self.mx = self.queue.get()
res = dns.resolver.Resolver()
res.lifetime = 1.5
res.timeout = 0.5
try:
answers = res.query(self.mx,'AAAA')
ip_mx = str(answers[0])
except:
ip_mx = "N/A"
lock.acquire()
sql = "INSERT INTO mx (id_domain,mx,ip_mx) VALUES (" + str(id_domain) + ",'" + str(self.mx) + "','" + str(ip_mx) + "')"
try:
cursor.execute(sql)
db.commit()
except:
db.rollback()
print "MX" , '>>' , ip_mx, ' :: ', str(self.mx)
lock.release()
self.queue.task_done()
The theme class used ... (The main For-loop is not here, it's just part of its body)
try:
answers = resolver.query(domain, 'MX')
qMX = Queue.Queue()
for i in range(len(answers)):
t = MX_getAAAA_thread(qMX,id_domain)
t.setDaemon(True)
threads.append(t)
t.start()
for mx in answers:
qMX.put(mx.exchange)
qMX.join()
except NoAnswer as e:
print "MX - Error: No Answer"
except Timeout as etime:
print "MX - Error: dns.exception.Timeout"
print "end of script"
I tried:
for thread in threads:
thread.join()
after the queue has been executed, but thread.join () never stops, even though there is no need to wait, because when queue.join () is executed, there is nothing to do for the threads.
source to share
I don't understand why you need it Queue
in the first place.
After all, in your project, each thread simply processes one task.
You have to move this task to the thread on creation.
This way you don't need Queue
it and you get rid of the while
-loop:
class MX_getAAAA_thread(threading.Thread):
def __init__(self, id_domain, mx):
threading.Thread.__init__(self)
self.id_domain = id_domain
self.mx = mx
Then you can get rid of the while
-loop inside the run
-method:
def run(self):
res = dns.resolver.Resolver()
res.lifetime = 1.5
res.timeout = 0.5
try:
answers = res.query(self.mx,'AAAA')
ip_mx = str(answers[0])
except:
ip_mx = "N/A"
with lock:
sql = "INSERT INTO mx (id_domain,mx,ip_mx) VALUES (" + str(id_domain) + ",'" + str(self.mx) + "','" + str(ip_mx) + "')"
try:
cursor.execute(sql)
db.commit()
except:
db.rollback()
print "MX" , '>>' , ip_mx, ' :: ', str(self.mx)
Create one thread for each task
for mx in answers:
t = MX_getAAAA_thread(qMX, id_domain, mx)
t.setDaemon(True)
threads.append(t)
t.start()
and join them
for thread in threads:
thread.join()
source to share
What I often do when my thread includes such an endless loop is to change the condition to something I can control from the outside. For example, for example:
def run(self):
self.keepRunning = True
while self.keepRunning:
# do stuff
This way I can change the property keepRunning
externally and set it to false to gracefully terminate the thread the next time the loop condition is checked.
Btw. since you seem to spawn exactly one thread for each item you queue, you don't even need to have thread loops at all, although I would say that you should always enforce the maximum limit of threads that can be spawned this way (i.e. .e. for i in range(min(len(answers), MAX_THREAD_COUNT)):
)
Alternative
In your case, instead of terminating the threads in each iteration of the for-loop, you can simply reuse the threads. From what I gather from the source of your streams, all that makes a stream unique to iterate over is a property id_domain
that you set when you created it. However, you can just provide this to your queue as well so that the threads are completely independent and you can reuse them.
It might look like this:
qMX = Queue.Queue()
threads = []
for i in range(MAX_THREAD_COUNT):
t = MX_getAAAA_thread(qMX)
t.daemon = True
threads.append(t)
t.start()
for id_domain in enumerateIdDomains():
answers = resolver.query(id_domain, 'MX')
for mx in answers:
qMX.put((id_domain, mx.exchange)) # insert a tuple
qMX.join()
for thread in threads:
thread.keepRunning = False
Of course, you need to change your flow a little:
class MX_getAAAA_thread(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self):
self.keepRunning = True
while self.keepRunning:
id_domain, mx = self.queue.get()
# do stuff
source to share