Rabbitmq Header Exchange and Confirmed Delivery

I am trying to use Header Exchange on RabbitMQ with mixed java and python components and I need confirmed delivery.

I seem to treat the behavior differently from python (pika) and java clients.

In python:

channel.exchange_declare(exchange='headers_test',
¦   ¦   ¦   ¦   ¦   ¦   ¦type='headers',
¦   ¦   ¦   ¦   ¦   ¦   ¦durable=True)
channel.confirm_delivery()
result = channel.basic_publish(exchange='headers_test',
¦   ¦   ¦   ¦   ¦   ¦ routing_key='',
¦   ¦   ¦   ¦   ¦   ¦ mandatory=True,
¦   ¦   ¦   ¦   ¦   ¦ body=message,
¦   ¦   ¦   ¦   ¦   ¦ properties=pika.BasicProperties(
¦   ¦   ¦   ¦   ¦   ¦   ¦ delivery_mode=2,
¦   ¦   ¦   ¦   ¦   ¦   ¦ headers=message_headers))

      

If the headers do not match any associated consumer and the message cannot be redirected, the result is false

But in java / scala:

channel.exchangeDeclare("headers_test", "headers", true, false, null)
channel.confirmSelect

val props = MessageProperties.PERSISTENT_BASIC.builder
¦   ¦   ¦   ¦  .headers(messageHeaders).build
channel.basicPublish("headers_test", 
¦   ¦   ¦   ¦   ¦   ¦"", //routingKey
¦   ¦   ¦   ¦   ¦   ¦true, //mandatory
¦   ¦   ¦   ¦   ¦   ¦props, 
¦   ¦   ¦   ¦   ¦   ¦"data".getBytes)
channel.waitForConfirmsOrDie()

      

Here, when messageHeaders doesn't find a match, the message seems to just be discarded without error .

Am I missing something, or is the behavior of both clients really different? And how can I get a confirmed delivery using header exchange in java?

Note. I already have a "complex" exchange for setting up queue routing, I'd rather not add dead letter routing to the game and just refuse to send.

+3


source to share


1 answer


The problem is that the message is considered acknowledged even if there is no queue matching your headers. From the docs ( https://www.rabbitmq.com/confirms.html ):

In the case of overwhelming messages, the broker issues an acknowledgment after the exchange verifies that the message will not be routed to any queue (returns an empty queue list). If the message is also posted as required, basic.return is sent to the client before basic.ack. The same is true for negative confirmations (basic.nack).

Instead, you should check the basic.return message to determine if the message was forwarded or not.

I have checked with wireshark and indeed I can see that if the message is not routed there the AMQP message basic.return.

I assume you should start with

channel.addReturnListener(new ReturnListener() {
  @Override
  public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println("App.handleReturn");
    System.out.println("replyCode = [" + replyCode + "], replyText = [" + replyText + "], exchange = [" + exchange + "], routingKey = [" + routingKey + "], properties = [" + properties + "], body = [" + body + "]");
  }
});

      

And indeed, if the message was not routed, I get this:

replyCode = [312], replyText = [NO_ROUTE], exchange = [headers_logs], routingKey = [], pro ....



Also, if you want to emulate the synchronous behavior of Pika in Java, it looks like you can do so by taking the current post sequence number before posting the message and registering an acknowledgment listener instead of relying on .waitForConfirmsOrDie ().

Thus, a complete code sample would look like this:

channel.addReturnListener(new ReturnListener() {
      @Override
      public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("App.handleReturn");
        System.out.println("replyCode = [" + replyCode + "], replyText = [" + replyText + "], exchange = [" + exchange + "], routingKey = [" + routingKey + "], properties = [" + properties + "], body = [" + body + "]");
      }
    });

    channel.addConfirmListener(new ConfirmListener() {
      @Override
      public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("App.handleAck");
        System.out.println("deliveryTag = [" + deliveryTag + "], multiple = [" + multiple + "]");
      }

      @Override
      public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("App.handleNack");
        System.out.println("deliveryTag = [" + deliveryTag + "], multiple = [" + multiple + "]");
      }
});

long nextPublishSeqNo = channel.getNextPublishSeqNo();
System.out.println("nextPublishSeqNo = " + nextPublishSeqNo);

channel.basicPublish("headers_logs",
    "",
     true,
     props,
    "data".getBytes());

      

And inside the return / confirm callback, you need to find the posting order of the channel that you received before posting the post.

If you look at what happens on the wire, in case the message was not forwarded to any queue, RabbitMq sends one basic.return message that also contains a confirmation (delivery tag). If the message was redirected, RabbitMq sends back one bacic.ack message that also contains an acknowledgment.

It seems that the RabbitMq Java client always calls the basicReturn () callback before basicConfirm (), so the logic to determine if a message has been routed or not could be like this:

Register the return and confirm listening on the channel; Remember the channel of the next serial number of the publication; Wait for a refund or confirmation. If this is a callback, the message has not been routed and you should ignore further confirmation for the same delivery tag. If you receive a handleAck () callback before you receive handleReturn (), it means the message has been routed to the queue.

Though I'm not sure in what case .handleNack () might be called.

+1


source







All Articles