Camel: is it possible to implement request / response and competing consumers with an asynchronous processor?
I am struggling with an implementation of a Request / Reply exchange that needs to be handled concurrently by multiple competing consumers .
I have one standalone Master module that is responsible for creating a task queue. And I have many Worker modules that need to consume messages from this queue at the same time.
This Master is part of Camel routing:
from("direct:start")
.to("log:FROM.DIRECT?level=DEBUG")
.split(body()).setHeader(CamelHeader.TASKS_BATCH_ID, simple("BATCH-1"))
.setHeader(CamelHeader.TASK_TYPE, simple(TaskType.FETCH_INDEX))
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
EdgarFullIndexLocation location =
exchange.getIn().getBody(EdgarFullIndexLocation.class);
exchange.getIn().setBody(location.getId().toJson(), String.class);
}
})
.to("log:SPLIT?level=DEBUG")
.setExchangePattern(ExchangePattern.InOut)
.to("activemq:queue:tasksQueue?replyTo=completionsQueue" +
//"&transactedInOut=true" +
"&requestTimeout=" + Integer.MAX_VALUE +
"&disableTimeToLive=true")
.threads(10)
.to("log:RESPONSE?level=DEBUG")
.routeId(routeId);
This is the working part of the Camel route where I am using a queue:
from("activemq:queue:tasksQueue?asyncConsumer=true" +
"&concurrentConsumers=10")
.to("log:FROM.TASKS.QUEUE?level=DEBUG")
.choice()
.when(header(CamelHeader.TASK_TYPE).isEqualTo(TaskType.FETCH_INDEX))
.process(new FetchIndexTaskProcessor())
.otherwise()
.to("log:UNKNOWN.TASK?level=DEBUG");
Here FetchIndexTaskProcessor implements AsyncProcessor :
public class FetchIndexTaskProcessor implements AsyncProcessor {
@Override public void process(Exchange exchange) throws Exception {}
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
FetchIndexTask task = new FetchIndexTask(exchange, callback);
task.start();
return false;
}
}
Here FetchIndexTask extends Thread . After start (), the new thread is responsible for:
- Dynamically adding a route.
- Blocking until the exchange of this route is completed.
- Preparing the response for the initial exchange.
- The call is
callback.done(false);
at the end.
Everything works, except for some of the competing customers - it's always one customer at a time.
I've tried many options:
- specifying a pool of threads with
.threads(10)
in different places. - using endpoint parameters like
asyncConsumer
andconcurrentConsumers
But it looks like I'm missing something and I can't seem to get it to work at the same time. What is the correct way to do this?
source to share
If you are using Camel 2.9 or higher, I suggest using answerToType = Exclusive on the activemq endpoint where you are making the request / response. This tells Camel that the queue is exclusive and it speeds up since no JMS message selectors are required to select the expected correlated messages.
See the JMS Request-Response section later in the Camel JMS docs: http://camel.apache.org/jms
If you are using transient queues, this is also very fast since no JMS message selectors are required.
Also, your route starts from a direct endpoint. This is a synchronous call, so the caller will wait / block until Exchange is complete.
Also Splitter EIP can be configured to run in parallel, which will use parallel processing. And if you have a large message to split, then consider using streaming, which will split the message on demand, instead of loading the entire message content into memory.
In any case, a lot happens on the route. Can you pinpoint where your problem is? This makes it easier to help.
source to share