Spring AMQP - Polling @RabbitListener under the hood?

Summary

I want to process messages from an AMQP / RabbitMQ queue asynchronously. I have implemented a method for this @RabbitListener

(from spring-rabbit), but it seems that this listener actually polled my queue under the hood. Can you expect this? I would expect the listener to somehow be notified by RabbitMQ instead of polling.

If this is to be expected, can I somehow also use messages asynchronously with Spring AMQP without polling?

What i watched

When I post a message, it is picked up correctly by the listener. I can still see a continuous stream of log messages that indicate that the listener continues to poll the empty queue:

โ€ฆ
15:41:10.543 [pool-1-thread-3] DEBUG o.s.a.r.l.BlockingQueueConsumer - ConsumeOK : Consumer: tags=[{amq.ctag-bUsK4KQN6_QHzf8DoDC_ww=myQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,1), acknowledgeMode=MANUAL local queue size=0
15:41:10.544 [main] DEBUG o.s.a.r.c.CachingConnectionFactory - Creating cached Rabbit Channel from AMQChannel(amqp://guest@127.0.1.1:5672/,2)
15:41:10.545 [main] DEBUG o.s.amqp.rabbit.core.RabbitTemplate - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,2)
15:41:10.545 [main] DEBUG o.s.amqp.rabbit.core.RabbitTemplate - Publishing message on exchange [], routingKey = [myQueue]
Sent: Hello World
15:41:10.559 [pool-1-thread-4] DEBUG o.s.a.r.l.BlockingQueueConsumer - Storing delivery for Consumer: tags=[{amq.ctag-bUsK4KQN6_QHzf8DoDC_ww=myQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,1), acknowledgeMode=MANUAL local queue size=0
15:41:10.560 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Received message: (Body:'Hello World'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=myQueue, deliveryTag=1, messageCount=0])
15:41:10.571 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.a.MessagingMessageListenerAdapter - Processing [GenericMessage [payload=Hello World, headers={timestamp=1435844470571, id=018f39f6-ebca-aabf-7fe3-a095e959f65d, amqp_receivedRoutingKey=myQueue, amqp_deliveryMode=PERSISTENT, amqp_consumerQueue=myQueue, amqp_consumerTag=amq.ctag-bUsK4KQN6_QHzf8DoDC_ww, amqp_contentEncoding=UTF-8, contentType=text/plain, amqp_deliveryTag=1, amqp_redelivered=false}]]
Received: Hello World
15:41:10.579 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-bUsK4KQN6_QHzf8DoDC_ww=myQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,1), acknowledgeMode=MANUAL local queue size=0
15:41:11.579 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-bUsK4KQN6_QHzf8DoDC_ww=myQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,1), acknowledgeMode=MANUAL local queue size=0
15:41:12.583 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-bUsK4KQN6_QHzf8DoDC_ww=myQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,1), acknowledgeMode=MANUAL local queue size=0
โ€ฆ

      

The last log message basically repeats itself indefinitely every second.

My test code

The first two methods are probably the most interesting part; the rest is mostly Spring config:

@Configuration
@EnableRabbit
public class MyTest {

    public static void main(String[] args) throws InterruptedException {
        try (ConfigurableApplicationContext appCtxt =
                new AnnotationConfigApplicationContext(MyTest.class)) {
            // send a test message
            RabbitTemplate template = appCtxt.getBean(RabbitTemplate.class);
            Queue queue = appCtxt.getBean(Queue.class);
            template.convertAndSend(queue.getName(), "Hello World");
            System.out.println("Sent: Hello World");

            // Now that the application with its message listeners is running,
            // block this thread forever; make sure, though, that the
            // application context can sanely be closed.
            appCtxt.registerShutdownHook();
            Object blockingObj = new Object();
            synchronized (blockingObj) {
                blockingObj.wait();
            }
        }
    }

    @RabbitListener(queues = "#{ @myQueue }")
    private void processHello(@Payload String msg,
            @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)
            throws IOException {
        System.out.println("Received: " + msg);
        channel.basicAck(deliveryTag, false);
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(rabbitConnFactory());
    }

    @Bean
    public ConnectionFactory rabbitConnFactory() {
        return new CachingConnectionFactory();
    }

    @Bean
    public SimpleRabbitListenerContainerFactory
            rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory result =
                new SimpleRabbitListenerContainerFactory();
        result.setConnectionFactory(rabbitConnFactory());
        result.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return result;
    }

    @Bean
    public Queue myQueue() {
        return new Queue("myQueue", false);
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(rabbitConnFactory());
    }
}

      

+3


source to share


1 answer


This is not a rabbitmq poll; when the message arrives asynchronously from the rabbit, it is placed on the consumer's internal queue; by passing the message to the listener thread, which is blocked waiting to arrive.

The DEBUG message you are seeing is after the listening timeout, waiting for a new message from rabbitmq.

You can increase receiveTimeout

to decrease the number of logs, or simply turn off DEBUG logging for BlockingQueueConsumer

.

Increasing the timeout will make the container less responsive to container requests stop()

.

EDIT

In response to your comment below ...



Yes, we could have interrupted the stream, but it was a little more than that. The receive timeout is also used for ack messages if txSize

is> 1.

Let's say you only want every 20 messages (not every message). People do this to improve performance in high volume environments. The timeout is also used for ack ( txSize

is actually all n messages or timeouts).

Now let's say 19 messages arrive and then not for 60 seconds and your timeout is 30 seconds.

This means that 19 messages have been inactive for a long time. With the default setting, ack will be sent 1 second after the 19th message arrives.

There is really little overhead in this timeout (we just go back and wait again), so it is unusual for it to increase.

In addition, while the container is stopped, when the context is closed, people stop and start containers all the time.

+4


source







All Articles