ActiveMQ multithreaded JMS client

I am using the below code to create multiple JMS sessions for multiple consumers to consume messages. My problem is the code is running in single threaded mode. Even if there are messages in the queue, the second thread cannot receive anything and simply polls. The first thread, meanwhile, finishes processing the first batch and returns and consumes the remaining messages. Is there something wrong with that here?

static {
    try {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://172.16.143.99:61616");
        connection = connectionFactory.createConnection();
        connection.start();
    } catch (JMSException e) {
        LOGGER.error("Unable to initialise JMS Queue.", e);
    }

}

public JMSClientReader(boolean isQueue, String name) throws QueueException {

    init(isQueue,name);
}

@Override
public void init(boolean isQueue, String name) throws QueueException
{

    // Create a Connection
    try {
        // Create a Session
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        if (isQueue) {
            destination = new ActiveMQQueue(name);// session.createQueue("queue");
        } else {
            destination = new ActiveMQTopic(name);// session.createTopic("topic");
        }
        consumer = session.createConsumer(destination);
    } catch (JMSException e) {
        LOGGER.error("Unable to initialise JMS Queue.", e);
        throw new QueueException(e);
    }
}

public String readQueue() throws QueueException {

    // connection.setExceptionListener(this);
    // Wait for a message
    String text = null;
    Message message;
    try {
        message = consumer.receive(1000);
        if(message==null)
            return "done";
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            text = textMessage.getText();
            LOGGER.info("Received: " + text);
        } else {
            throw new JMSException("Invalid message found");
        }
    } catch (JMSException e) {
        LOGGER.error("Unable to read message from Queue", e);
        throw new QueueException(e);
    }


    LOGGER.info("Message read is " + text);
    return text;

}

      

0


source to share


1 answer


Your problem is prefetchPolicy.

persistent queues (default value: 1000)
non-persistent queues (default value: 1000)
persistent topics (default value: 100)
non-persistent topics (default value: Short.MAX_VALUE - 1)

      

all messages are sent to the first consumer connected, and when another connects, it receives no messages, so to change this behavior, if you have a parallel consumer for the queue, you need to set prefetchPolicy to less than the default. for example add this jms.prefetchPolicy.queuePrefetch=1

to uri config in activemq.xml file or set it to client url like this

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://172.16.143.99:61616?jms.prefetchPolicy.queuePrefetch=1");

      



Large prefetch values ​​are recommended for high performance with high message volumes. However, for lower message volumes, where each message takes a long time to process, the prefetch should be set to 1. This ensures that the consumer only processes one message at a time. However, specifying a prefetch prefix will cause the consumer to poll for messages, one at a time, rather than being pushed towards the consumer for the message.

Take a look at http://activemq.apache.org/what-is-the-prefetch-limit-for.html

AND

http://activemq.apache.org/destination-options.html

+4


source







All Articles