Processing multiple messages in parallel with ActiveMQ

I would like to process messages in a queue in parallel using a simple Processor / AsyncProcessor as the destination. The processor takes a little time per message, but each message can be processed separately and therefore at the same time (within healthy bounds).

I'm having a hard time finding examples especially about camel routes xml configuration.

So far I've defined the threadpool, route and cpu:

<threadPool id="smallPool" threadName="MyProcessorThread" poolSize="5" maxPoolSize="50" maxQueueSize="100"/>
<route>
    <from uri="broker:queue:inbox" />
    <threads executorServiceRef="smallPool">
        <to uri="MyProcessor" />
    </threads>
</route>
<bean id="MyProcessor" class="com.example.java.MyProcessor" />

      

and my processor looks like this:

public class MyProcessor implements Processor {
    @Override
    public void process(Exchange exchange) throws Exception {
        Message in = exchange.getIn();
        String msg = in.getBody(String.class);      
        System.out.println(msg);
        try {
            Thread.sleep(10 * 1000); // Do something in the background
        } catch (InterruptedException e) {}
        System.out.println("Done!");
    }
}

      

Unfortunately, when I post messages to the queue, they are still processed one by one, each delayed by 10 seconds (my "background task").

Can anyone point me in the right direction so that messages are processed with a specific thread or explain what I am doing wrong?

+3


source to share


1 answer


You have to use the concurrentConsumers parameters as pointed out in the comments,

<route>
    <from uri="broker:queue:inbox?concurrentConsumers=5" />
    <to uri="MyProcessor" />
</route>

      

Note that there is also maxConcurrentConsumers

, you can set a minimum range for simultaneous consumers, so Camel will grow / shrink automatically based on load.



See the JMS docs at

+5


source







All Articles