RabbitMQ consumer connection dies after 90 seconds of inactivity

I have a RabbitMQ task queue and a Pika consumer to consume these tasks (using acks). The problem is that the connection freezes after 90 seconds of inactivity, but my tasks often take longer. This means that although tasks are still being evaluated, they are returned to the task queue and never called.

Using RabbitMQ 3.5.3 and Pika 0.9.14 with channel.basic_consume () method. The connection has a heartbeat_interval of 30 seconds.

Use code:

import pika
from time import sleep

RABBITMQ_URL = "amqp://user:pass@my-host.com/my_virtual_host?heartbeat_interval=30"
QUEUE_NAME = "my_queue"


def callback(ch, method, properties, body):
    print body
    sleep(91)  # if sleep value < 90 this code works (even 89)
    ch.basic_ack(delivery_tag=method.delivery_tag)


parameters = pika.URLParameters(RABBITMQ_URL)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=QUEUE_NAME, durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue=QUEUE_NAME)
channel.start_consuming()

      

Traceback:

Traceback (most recent call last):
  File "main.py", line 19, in <module>
    channel.basic_consume(callback, queue=QUEUE_NAME)
  File "/usr/local/lib/python2.7/site-packages/pika/channel.py", line 221, in basic_consume
    {'consumer_tag': consumer_tag})])
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 1143, in _rpc
    self.connection.process_data_events()
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 240, in process_data_events
    if self._handle_read():
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 347, in _handle_read
    if self._read_poller.ready():
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 43, in inner
    return f(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 89, in ready
    self.poll_timeout)
select.error: (9, 'Bad file descriptor')

      

+3


source to share


1 answer


The problem is that since you sleep for so long pika cannot respond to heartbeat requests from RabbitMQ and when this happens RabbitMQ will close the connection.

The only way to get around this is to either turn off the pulse, or hibernate at smaller intervals, and run process_data_events()

continuously so the pika can process the heartbeats.

eg. something like that



def amqp_sleep(connection, time_to_sleep=20):
    remaining = time_to_sleep
    while remaining > 0:
        connection.process_data_events()
        time.sleep(5)
        remaining -= 5

      

Personally, though I would go for a library that automatically handles heartbeats in the background, so you don't have to deal with them for example. rabbitpy or my own amqp-storm .

+2


source







All Articles