Pika: how to consume messages synchronously

I would like to periodically run a process (for example once every 10 minutes or once an hour) that receives all messages from the queue, processes them, and then exits. Is there a way to do this with pika

or use a different python lib?

+4


source to share


3 answers


I think the ideal solution here would be to use the basic_get method . It will receive one message, but if the queue is already empty, it will return None

. The advantage of this is that you can empty the queue with a simple loop and then just break the loop after returning None

, and it is also safe to run basic_get with multiple consumers.

This example is based on my own library; amqpstorm , but you can easily implement the same with pika .



from amqpstorm import Connection

connection = Connection('127.0.0.1', 'guest', 'guest')
channel = connection.channel()
channel.queue.declare('simple_queue')
while True:
    result = channel.basic.get(queue='simple_queue', no_ack=False)
    if not result:
        print("Channel Empty.")
        # We are done, lets break the loop and stop the application.
        break
    print("Message:", result['body'])
    channel.basic.ack(result['method']['delivery_tag'])
channel.close()
connection.close()

      

+6


source


Will this work for you:

  • Measure the current queue length as N = queue.method.message_count

  • Call back the processed messages and as soon as they N

    are processed, call channel.stop_consuming

    .


So the client code would be something like this:

class CountCallback(object):
    def __init__(self, count):
        self.count = count

    def __call__(self, ch, method, properties, body):
        # process the message here
        self.count -= 1
        if not self.count:
            ch.stop_consuming()

channel = conn.channel()
queue = channel.queue_declare('tasks')
callback = CountCallback(queue.method.message_count)
channel.basic_consume(callback, queue='tasks')
channel.start_consuming()

      

+3


source


@eandersson

This example is based on my own library; amqpstorm, but you can easily implement the same with pika.

updated for amqpstorm 2.6.1:

from amqpstorm import Connection

connection = Connection('127.0.0.1', 'guest', 'guest')
channel = connection.channel()
channel.queue.declare('simple_queue')
while True:
    result = channel.basic.get(queue='simple_queue', no_ack=False)
    if not result:
        print("Channel Empty.")
        # We are done, lets break the loop and stop the application.
        break
    print("Message:", result.body)
    channel.basic.ack(result.method['delivery_tag'])
channel.close()
connection.close()

      

0


source







All Articles