How to handle Java futures with Akka actors
I have a layered architecture in a Java web application. The UI layer is just Java, services are typed by Akka and external service calls (WS, DB, etc.) which are wrapped in Hystrix commands.
The UI calls the service and the service returns the future Akka. This is the future of Akka because I want to simplify coding the UI with the onComplete and onFailure callbacks that Akka futures provide. The service then creates a future that does some mapping, etc., and terminates a HystrixCommand call that returns the Java future.
So, in pseudocode:
interface
AkkaFuture future = service.getSomeData();
Service
public AkkaFuture getSomeData() {
return future {
JavaFuture future = new HystrixCommand(mapSomeData()).queue()
//what to do here, currently just return future.get()
}
}
The problem is I would like to free the thread that the service actor is using and just bind the threads that the Hystrix is ββusing. But the future of java prevents this because I have to block it from completing. The only option I can think of (which I'm not sure what I like) is to constantly parse the Java future and complete the Akka future when the Java future ends.
Note: The question is really not related to Hystrix per se, but I decided to mention it in case someone comes up with a solution specifically related to Hystrix.
source to share
Java futures are notoriously inferior in design compared to Scala futures. Take a look at the discussion How can I, for example, wrap java.util.concurrent.Future in the future Akka .
But: Perhaps instead of polling (as suggested in the previous discussion), Hystrix offers some kind of callback onComplete
? I don't know the library at all, but stumbled upon onComplete
in the Hystrix API . Maybe this helps?
source to share
I am marking @ Hbf's answer as a solution, since I ended up making an Akka poller as explained in How to wrap java.util.concurrent.Future into Akka's future? ... For reference, I've also tried:
- Create a HystrixCommandExcutionHook and extend the HystrixCommand to provide callbacks. It didn't work because the hook wasn't called at the right time.
- Using Guavas in the future to listen if the decorated artist creates futures inside Hystrix and then issues futures from teams. Doesn't work because Hystrix is ββusing ThreadPoolExecutor, which cannot be decorated.
EDIT: I am adding Akka's poller code below as the original answer was in Scala and it hangs if future Java does not undo nicely. The solution below always leaves threads after timeout.
protected Future wrapJavaFutureInAkkaFuture(final java.util.concurrent.Future javaFuture, final Option maybeTimeout, final ActorSystem actorSystem) {
final Promise promise = Futures.promise();
if (maybeTimeout.isDefined()) {
pollJavaFutureUntilDoneOrCancelled(javaFuture, promise, Option.option(maybeTimeout.get().fromNow()), actorSystem);
} else {
pollJavaFutureUntilDoneOrCancelled(javaFuture, promise, Option. none(), actorSystem);
}
return promise.future();
}
protected void pollJavaFutureUntilDoneOrCancelled(final java.util.concurrent.Future javaFuture, final Promise promise, final Option maybeTimeout, final ActorSystem actorSystem) {
if (maybeTimeout.isDefined() && maybeTimeout.get().isOverdue()) {
// on timeouts, try to cancel the Java future and simply walk away
javaFuture.cancel(true);
promise.failure(new ExecutionException(new TimeoutException("Future timed out after " + maybeTimeout.get())));
} else if (javaFuture.isDone()) {
try {
promise.success(javaFuture.get());
} catch (final Exception e) {
promise.failure(e);
}
} else {
actorSystem.scheduler().scheduleOnce(Duration.create(50, TimeUnit.MILLISECONDS), new Runnable() {
@Override
public void run() {
pollJavaFutureUntilDoneOrCancelled(javaFuture, promise, maybeTimeout, actorSystem);
}
}, actorSystem.dispatcher());
}
}
source to share
As of Hystrix 1.3, it now also supports true non-blocking callbacks and will fit much better into Akka / Scala Future non-blocking behavior that can be linked: https://github.com/Netflix/Hystrix/wiki/ How-To-Use # wiki-Reactive-Execution
source to share