How do I close threads in Python?

I have a problem with too many threads. I think the queue.join () command is just closing the queue and not using its threads.

In my script, I need to check 280k domains and, for each domain, get a list of its MX records and get the IPv6 address of the servers, if any.

I used streams and thanked them to script it many times faster. But there is a problem, although there is a join () in the queue, the number of live threads grows until an error appears that says that can not create any new thread (OS limitation?).

How can I terminate / close / stop / reset threads after every For loop when I return a new domain from the database?

Determining the thread class ...

class MX_getAAAA_thread(threading.Thread):
    def __init__(self,queue,id_domain):
        threading.Thread.__init__(self)
        self.queue = queue
        self.id_domain = id_domain


    def run(self):
        while True:
            self.mx = self.queue.get()

            res = dns.resolver.Resolver()
            res.lifetime = 1.5
            res.timeout = 0.5

            try:
                answers = res.query(self.mx,'AAAA')
                ip_mx = str(answers[0])
            except:
                ip_mx = "N/A"

            lock.acquire()

            sql = "INSERT INTO mx (id_domain,mx,ip_mx) VALUES (" + str(id_domain) + ",'" + str(self.mx) + "','" + str(ip_mx) + "')"
            try:
                cursor.execute(sql)
                db.commit()
            except:
                db.rollback()

            print "MX" , '>>' , ip_mx, ' :: ', str(self.mx)

            lock.release()
            self.queue.task_done()

      

The theme class used ... (The main For-loop is not here, it's just part of its body)

try:
    answers = resolver.query(domain, 'MX')

    qMX = Queue.Queue()
    for i in range(len(answers)):
        t = MX_getAAAA_thread(qMX,id_domain)
        t.setDaemon(True)
        threads.append(t)
        t.start()

    for mx in answers:
        qMX.put(mx.exchange)

    qMX.join()

except NoAnswer as e:
    print "MX - Error: No Answer"
except Timeout as etime:
    print "MX - Error: dns.exception.Timeout"

print "end of script"

      

I tried:

for thread in threads:
            thread.join()

      

after the queue has been executed, but thread.join () never stops, even though there is no need to wait, because when queue.join () is executed, there is nothing to do for the threads.

+3


source to share


3 answers


I don't understand why you need it Queue

in the first place.
After all, in your project, each thread simply processes one task.
You have to move this task to the thread on creation.
This way you don't need Queue

it and you get rid of the while

-loop:

class MX_getAAAA_thread(threading.Thread):
    def __init__(self, id_domain, mx):
        threading.Thread.__init__(self)
        self.id_domain = id_domain
        self.mx = mx

      

Then you can get rid of the while

-loop inside the run

-method:

def run(self):
    res = dns.resolver.Resolver()
    res.lifetime = 1.5
    res.timeout = 0.5

    try:
        answers = res.query(self.mx,'AAAA')
        ip_mx = str(answers[0])
    except:
        ip_mx = "N/A"

    with lock:
        sql = "INSERT INTO mx (id_domain,mx,ip_mx) VALUES (" + str(id_domain) + ",'" + str(self.mx) + "','" + str(ip_mx) + "')"
        try:
            cursor.execute(sql)
            db.commit()
        except:
            db.rollback()

        print "MX" , '>>' , ip_mx, ' :: ', str(self.mx)

      



Create one thread for each task

for mx in answers:
    t = MX_getAAAA_thread(qMX, id_domain, mx)
    t.setDaemon(True)
    threads.append(t)
    t.start()

      

and join them

for thread in threads:
    thread.join()

      

+3


source


What I often do when my thread includes such an endless loop is to change the condition to something I can control from the outside. For example, for example:

def run(self):
    self.keepRunning = True
    while self.keepRunning:
        # do stuff

      

This way I can change the property keepRunning

externally and set it to false to gracefully terminate the thread the next time the loop condition is checked.

Btw. since you seem to spawn exactly one thread for each item you queue, you don't even need to have thread loops at all, although I would say that you should always enforce the maximum limit of threads that can be spawned this way (i.e. .e. for i in range(min(len(answers), MAX_THREAD_COUNT)):

)

Alternative



In your case, instead of terminating the threads in each iteration of the for-loop, you can simply reuse the threads. From what I gather from the source of your streams, all that makes a stream unique to iterate over is a property id_domain

that you set when you created it. However, you can just provide this to your queue as well so that the threads are completely independent and you can reuse them.

It might look like this:

qMX = Queue.Queue()
threads = []
for i in range(MAX_THREAD_COUNT):
    t = MX_getAAAA_thread(qMX)
    t.daemon = True
    threads.append(t)
    t.start()

for id_domain in enumerateIdDomains():
    answers = resolver.query(id_domain, 'MX')
    for mx in answers:
        qMX.put((id_domain, mx.exchange)) # insert a tuple

qMX.join()

for thread in threads:
    thread.keepRunning = False

      

Of course, you need to change your flow a little:

class MX_getAAAA_thread(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def run(self):
        self.keepRunning = True
        while self.keepRunning:
            id_domain, mx = self.queue.get()
            # do stuff

      

+5


source


Attaching threads will do the trick, but the connections in your case will block indefinitely because your threads never leave your run loop. You need to exit the run method so that the threads can be connected.

+2


source







All Articles