Camel: is it possible to implement request / response and competing consumers with an asynchronous processor?

I am struggling with an implementation of a Request / Reply exchange that needs to be handled concurrently by multiple competing consumers .

I have one standalone Master module that is responsible for creating a task queue. And I have many Worker modules that need to consume messages from this queue at the same time.

This Master is part of Camel routing:

from("direct:start")
.to("log:FROM.DIRECT?level=DEBUG")
.split(body()).setHeader(CamelHeader.TASKS_BATCH_ID, simple("BATCH-1"))
.setHeader(CamelHeader.TASK_TYPE, simple(TaskType.FETCH_INDEX))
.process(new Processor() {
    @Override
    public void process(Exchange exchange) throws Exception {
        EdgarFullIndexLocation location = 
            exchange.getIn().getBody(EdgarFullIndexLocation.class);
        exchange.getIn().setBody(location.getId().toJson(), String.class);
    }
})
.to("log:SPLIT?level=DEBUG")
.setExchangePattern(ExchangePattern.InOut)
.to("activemq:queue:tasksQueue?replyTo=completionsQueue" +
    //"&transactedInOut=true" + 
    "&requestTimeout=" + Integer.MAX_VALUE +
    "&disableTimeToLive=true")
.threads(10)
.to("log:RESPONSE?level=DEBUG")
.routeId(routeId);

      

This is the working part of the Camel route where I am using a queue:

from("activemq:queue:tasksQueue?asyncConsumer=true" + 
    "&concurrentConsumers=10")
.to("log:FROM.TASKS.QUEUE?level=DEBUG")
.choice()
    .when(header(CamelHeader.TASK_TYPE).isEqualTo(TaskType.FETCH_INDEX))
        .process(new FetchIndexTaskProcessor())
    .otherwise()
        .to("log:UNKNOWN.TASK?level=DEBUG");

      

Here FetchIndexTaskProcessor implements AsyncProcessor :

public class FetchIndexTaskProcessor implements AsyncProcessor {
    @Override public void process(Exchange exchange) throws Exception {}

    @Override
    public boolean process(Exchange exchange, AsyncCallback callback) {
        FetchIndexTask task = new FetchIndexTask(exchange, callback);
        task.start();
        return false;
    }

}

      

Here FetchIndexTask extends Thread . After start (), the new thread is responsible for:

  • Dynamically adding a route.
  • Blocking until the exchange of this route is completed.
  • Preparing the response for the initial exchange.
  • The call is callback.done(false);

    at the end.

Everything works, except for some of the competing customers - it's always one customer at a time.

I've tried many options:

  • specifying a pool of threads with .threads(10)

    in different places.
  • using endpoint parameters like asyncConsumer

    andconcurrentConsumers

But it looks like I'm missing something and I can't seem to get it to work at the same time. What is the correct way to do this?

+3


source to share


1 answer


If you are using Camel 2.9 or higher, I suggest using answerToType = Exclusive on the activemq endpoint where you are making the request / response. This tells Camel that the queue is exclusive and it speeds up since no JMS message selectors are required to select the expected correlated messages.

See the JMS Request-Response section later in the Camel JMS docs: http://camel.apache.org/jms

If you are using transient queues, this is also very fast since no JMS message selectors are required.



Also, your route starts from a direct endpoint. This is a synchronous call, so the caller will wait / block until Exchange is complete.

Also Splitter EIP can be configured to run in parallel, which will use parallel processing. And if you have a large message to split, then consider using streaming, which will split the message on demand, instead of loading the entire message content into memory.

In any case, a lot happens on the route. Can you pinpoint where your problem is? This makes it easier to help.

+2


source







All Articles