Poll poll lines and adding them to python queue for processing dies

I have a piece of multithreaded code - 3 threads that poll data from SQS and add it to a python queue. 5, which take messages from the python queue, process them and send them to the system at the back.

Here is the code:

python_queue = Queue.Queue()

class GetDataFromSQS(threading.Thread):
    """Threaded Url Grab"""
    def __init__(self, python_queue):
        threading.Thread.__init__(self)
        self.python_queue = python_queue

    def run(self):
        while True:
            time.sleep(0.5) //sleep for a few secs before querying again
            try:
                msgs = sqs_queue.get_messages(10)
                if msgs == None:
                    print "sqs is empty now"!
                for msg in msgs:
                    #place each message block from sqs into python queue for processing
                    self.python_queue.put(msg)
                    print "Adding a new message to Queue. Queue size is now %d" % self.python_queue.qsize()
                    #delete from sqs
                    sqs_queue.delete_message(msg)
            except Exception as e:
                print "Exception in GetDataFromSQS :: " +  e


class ProcessSQSMsgs(threading.Thread):
    def __init__(self, python_queue):
        threading.Thread.__init__(self)
        self.python_queue = python_queue
        self.pool_manager = PoolManager(num_pools=6)

    def run(self):
        while True:
            #grabs the message to be parsed from sqs queue
            python_queue_msg = self.python_queue.get()
            try:
                processMsgAndSendToBackend(python_queue_msg, self.pool_manager)
            except Exception as e:
                print "Error parsing:: " + e
            finally:
                self.python_queue.task_done()

def processMsgAndSendToBackend(msg, pool_manager):
    if msg != "":
        ###### All the code related to processing the msg
        for individualValue in processedMsg:
            try:
                response = pool_manager.urlopen('POST', backend_endpoint, body=individualValue)
                if response == None:
                    print "Error"
                else:
                    response.release_conn()
            except Exception as e:
                print "Exception! Post data to backend: " + e


def startMyPython():
    #spawn a pool of threads, and pass them queue instance
    for i in range(3):
        sqsThread = GetDataFromSQS(python_queue)
        sqsThread.start()

    for j in range(5):
        parseThread = ProcessSQSMsgs(python_queue)
        #parseThread.setDaemon(True)
        parseThread.start()

    #wait on the queue until everything has been processed
    python_queue.join()
    # python_queue.close() -- should i do this?

startMyPython()

      

Problem: 3 python employees die randomly (monitored using top -p -H) every few days and it's okay if I kill the process and run the script again. I suspect the workers that disappeared are 3 GetDataFromSQS threads. And since GetDataFromSQS dies, the other 5 workers, although they are working, are always sleeping as there is no data in the python queue. I'm not sure what I am doing wrong here, as I am fairly new to python and am following this tutorial to create this logic and queue threads - http://www.ibm.com/developerworks/aix/library/au-threadingpython/

Thanks in advance for your help. I hope I have explained my problem clearly.

+3


source to share


1 answer


The issue for thread hanging was related to getting the sqs queue handle. I used IAM to manage credentials and boto sdk to connect to sqs.

The main reason for this issue was that the boto package was reading metadata for auth from AWS and was interrupted from time to time.

The fix is ​​to edit the boto config to increase the attempts made to make the auth call to AWS.



[Boto] metadata_service_num_attempts = 5

( https://groups.google.com/forum/#!topic/boto-users/1yX24WG3g1E )

+4


source







All Articles