RabbitMQ consumer connection dies after 90 seconds of inactivity
I have a RabbitMQ task queue and a Pika consumer to consume these tasks (using acks). The problem is that the connection freezes after 90 seconds of inactivity, but my tasks often take longer. This means that although tasks are still being evaluated, they are returned to the task queue and never called.
Using RabbitMQ 3.5.3 and Pika 0.9.14 with channel.basic_consume () method. The connection has a heartbeat_interval of 30 seconds.
Use code:
import pika
from time import sleep
RABBITMQ_URL = "amqp://user:pass@my-host.com/my_virtual_host?heartbeat_interval=30"
QUEUE_NAME = "my_queue"
def callback(ch, method, properties, body):
print body
sleep(91) # if sleep value < 90 this code works (even 89)
ch.basic_ack(delivery_tag=method.delivery_tag)
parameters = pika.URLParameters(RABBITMQ_URL)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=QUEUE_NAME, durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue=QUEUE_NAME)
channel.start_consuming()
Traceback:
Traceback (most recent call last):
File "main.py", line 19, in <module>
channel.basic_consume(callback, queue=QUEUE_NAME)
File "/usr/local/lib/python2.7/site-packages/pika/channel.py", line 221, in basic_consume
{'consumer_tag': consumer_tag})])
File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 1143, in _rpc
self.connection.process_data_events()
File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 240, in process_data_events
if self._handle_read():
File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 347, in _handle_read
if self._read_poller.ready():
File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 43, in inner
return f(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 89, in ready
self.poll_timeout)
select.error: (9, 'Bad file descriptor')
source to share
The problem is that since you sleep for so long pika cannot respond to heartbeat requests from RabbitMQ and when this happens RabbitMQ will close the connection.
The only way to get around this is to either turn off the pulse, or hibernate at smaller intervals, and run process_data_events()
continuously so the pika can process the heartbeats.
eg. something like that
def amqp_sleep(connection, time_to_sleep=20):
remaining = time_to_sleep
while remaining > 0:
connection.process_data_events()
time.sleep(5)
remaining -= 5
Personally, though I would go for a library that automatically handles heartbeats in the background, so you don't have to deal with them for example. rabbitpy or my own amqp-storm .
source to share