ActiveMQ why RedeliveryCounter doesn't increase when consumer disconnects to Ack

My consumer is running on an application server. This is my ActiveMq config:

optimizeAcknowledge=true  
optimizeAcknowledgeTimeOut=5000  
destination=queue.test.msg?consumer.prefetchSize=10  

      

After I sent 10 messages, the "web console" ( http: // localhost: 8161 / admin ) could see 3 messages: not Ack (used by optimizeAcknowledge), At this time when I restarted the client server, I again received 3 messages and RedeliveryCounter = 0. So I do not define that these are delivery messages or just sent by the manufacturer.

This question also occurs in a transaction. I am running a test class: org \ apache \ activemq \ RedeliveryPolicyTest.java, methoed: testRepeatedRedeliveryReceiveNoCommit. debug in "connection.close ();" and stop it. The RedeliveryCounter message is also "0".

Question: So when I actually determine that a message is redelivery or not, I have to use the store to store all the message ids after the consumer has processed it and check if there is when the consumer received the message?

Code below:

public void testRepeatedRedeliveryReceiveNoCommit() throws Exception {
    connection.start();
    Session dlqSession = connection.createSession(true, Session.SESSION_TRANSACTED);
    ActiveMQQueue destination = new ActiveMQQueue("TEST");
    MessageProducer producer = dlqSession.createProducer(destination);

    // Send the messages
    producer.send(dlqSession.createTextMessage("1st"));

    dlqSession.commit();
    MessageConsumer dlqConsumer = dlqSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));

    final int maxRedeliveries = 4;
    for (int i=0;i<=maxRedeliveries +1;i++) {

        connection = (ActiveMQConnection)factory.createConnection(userName, password);
        connections.add(connection);
        // Receive a message with the JMS API
        RedeliveryPolicy policy = connection.getRedeliveryPolicy();
        policy.setInitialRedeliveryDelay(0);
        policy.setUseExponentialBackOff(false);
        policy.setMaximumRedeliveries(maxRedeliveries);

        connection.start();
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer = session.createConsumer(destination);

        ActiveMQTextMessage m = ((ActiveMQTextMessage)consumer.receive(4000));
        if (i<=maxRedeliveries) {
            m.setRedeliveryCounter(1);
            System.out.println("Text:"+ m.getText());
            System.out.println("RedeliveryCounter:" + m.getRedeliveryCounter());
        } else {
            System.out.println("null on exceeding redelivery count:" + m);
        }
        connection.close();
        connections.remove(connection);
    }

    // We should be able to get the message off the DLQ now.
    TextMessage m = (TextMessage)dlqConsumer.receive(1000);
    System.out.println("Got message from DLQ:" + m);
    System.out.println("Text:"+ m.getText());
    String cause = m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
    System.out.println("cause exception has policy ref:" +  cause.contains("RedeliveryPolicy"));
    dlqSession.commit();
}

      

+3


source to share





All Articles