Pika: how to consume messages synchronously
3 answers
I think the ideal solution here would be to use the basic_get method . It will receive one message, but if the queue is already empty, it will return None
. The advantage of this is that you can empty the queue with a simple loop and then just break the loop after returning None
, and it is also safe to run basic_get with multiple consumers.
This example is based on my own library; amqpstorm , but you can easily implement the same with pika .
from amqpstorm import Connection
connection = Connection('127.0.0.1', 'guest', 'guest')
channel = connection.channel()
channel.queue.declare('simple_queue')
while True:
result = channel.basic.get(queue='simple_queue', no_ack=False)
if not result:
print("Channel Empty.")
# We are done, lets break the loop and stop the application.
break
print("Message:", result['body'])
channel.basic.ack(result['method']['delivery_tag'])
channel.close()
connection.close()
+6
source to share
Will this work for you:
- Measure the current queue length as
N = queue.method.message_count
- Call back the processed messages and as soon as they
N
are processed, callchannel.stop_consuming
.
So the client code would be something like this:
class CountCallback(object):
def __init__(self, count):
self.count = count
def __call__(self, ch, method, properties, body):
# process the message here
self.count -= 1
if not self.count:
ch.stop_consuming()
channel = conn.channel()
queue = channel.queue_declare('tasks')
callback = CountCallback(queue.method.message_count)
channel.basic_consume(callback, queue='tasks')
channel.start_consuming()
+3
source to share
This example is based on my own library; amqpstorm, but you can easily implement the same with pika.
updated for amqpstorm 2.6.1:
from amqpstorm import Connection
connection = Connection('127.0.0.1', 'guest', 'guest')
channel = connection.channel()
channel.queue.declare('simple_queue')
while True:
result = channel.basic.get(queue='simple_queue', no_ack=False)
if not result:
print("Channel Empty.")
# We are done, lets break the loop and stop the application.
break
print("Message:", result.body)
channel.basic.ack(result.method['delivery_tag'])
channel.close()
connection.close()
0
source to share