In Java, how do I determine the future <V> that returns the best out of multiple answers?
I need a type value V
to be calculated asynchronously. Unfortunately, the best answer may take too long, so I have a few other values ββto take as a last resort. What I would like to do is define Future<V>
which one I can call with a timeout and return it with the best answer available at the time it was. Something like:
Future<V> theValue = // something involving executor.submit()
// inside here, asynchronous calls to find
V default = // something pretty quick to return
V good = // something that will take longer and might not return in time
V better = // something that will take longest but is the best answer
V v = theValue.get(5, TimeUnit.SECONDS); // now v should contain one of default,
// good, or better, preferring them in the expected order
I'm sure this is a fairly common model, but I haven't been able to find a good example. Any help?
source to share
In the proposed scenario, there are three different computation options (I'll call them good, best, and best). Each successive version produces a result that is preferred over the previous version, but due to the increased complexity it may take a correspondingly longer period of time to complete. There is a certain amount of time that the caller is willing to wait for some result, and in this answer I will use 5 (5) seconds for this cutoff time.
It is possible to maintain this strict order of preference by avoiding loops and queues by using a pair of latches for each batch of related operations.
Primary logic
ExecutorService execService = // ... e.g. new ThreadPoolExecutor(...)
<T> T compute(
Callable<? extends T> good,
Callable<? extends T> better,
Callable<? extends T> best) {
RelatedCallables calls = new RelatedCallables(); // new for each batch
Future<T> bestResult = execService.submit(calls.wrap(best)); // first wrapped is primary
Future<T> betterResult = execService.submit(calls.wrap(better));
Future<T> goodResult = execService.submit(calls.wrap(good));
try {
if (!calls.awaitPrimary(5, TimeUnit.SECONDS)) {
calls.awaitAny(); // waits indefinitely, unless THIS thread interrupted
}
// reaching here means at least one of them has a result
if (bestResult.isDone()) return bestResult.get();
if (betterResult.isDone()) return betterResult.get();
return goodResult.get();
}
catch (ExecutionException failedExecution) {
// TODO: handling this is left as an exercise for the reader
return null;
}
catch (InterruptedException interrupted) {
// TODO: handling this is left as an exercise for the reader
return null;
}
finally {
boolean sendInterrupt = true; // or false, depending on your needs
goodResult.cancel(sendInterrupt);
betterResult.cancel(sendInterrupt);
bestResult.cancel(sendInterrupt);
}
}
This solution uses a helper class RelatedCallables
(more details below) to communicate between three computations, represented as instances Callable
. The helper class will wrap each instance and the wrapper is presented in ExecutorService
for parallel execution. In this implementation, it is important that the best one is Callable
wrapped first. The order in which the other instances are wrapped and dispatched is not important.
The combination of methods awaitPrimary
and awaitAny
helper RelatedCallables
combined with conditional if
sets our wait and timeout policy. If the best (primary) result is available within the specified timeout, it skips the contents of the block if
and jumps directly to the returned results so that the caller does not have to wait five seconds.
If awaitPrimary
expires before the best evaluation completes, it enters the true conditional branch and waits indefinitely for any evaluation to complete. It is usually expected (but I am not suggesting) that at least one of the other computations will complete within the main timeout; if yes, awaitAny
will return immediately. Instead of waiting indefinitely for one of the three computations, it is also possible to return a null
specified value or throw an exception with some relatively minor modifications.
Once the program thread traverses the conditional block, it checks each one Future
in order of preference and returns a value from the first, which indicates that it has completed. Also note that the block finally
tries to cancel any pending calculations.
Helper class (internal)
static class RelatedCallables {
private final CountDownLatch primaryLatch = new CountDownLatch(1);
private final CountDownLatch anyLatch = new CountDownLatch(1);
private boolean hasPrimary;
void awaitAny() throws InterruptedException {
anyLatch.await();
}
boolean awaitPrimary(long timeout, TimeUnit unit) throws InterruptedException {
return primaryLatch.await(timeout, unit);
}
<T> Callable<T> wrap(final Callable<? extends T> original) {
final boolean isPrimary = !hasPrimary;
hasPrimary = true;
return new Callable<T>() {
@Override
public T call() throws Exception {
try {
return original.call();
}
finally {
anyLatch.countDown();
if (isPrimary) primaryLatch.countDown();
}
}
};
}
}
It is a relatively simple wrapper Callable
that binds to a wrapped instance. Its main purpose is to reduce the number of delays anyLatch
after the wrapped instance completes. This way, it awaitAny
will know if any of the cover wrapped by this helper is complete.
There is a second latch, used only with the first Callable
one presented in wrap
, that logically distinguishes the primary (or better) instance. The wrapper for this instance shrinks this individual latch so that it awaitPrimary
can fail quickly if the best computation completes before the cutoff time expires.
Since CountDownLatch
it cannot be reused, each separate batch of operations Callable
requires a new pair of latches. This implementation accomplishes this by creating a new instance RelatedCallables
for each batch.
source to share
I suggest you use BlockingQueue to store the results
class MainClass {
BlockingQueue<V> queue = new LinkedBlockingQueue();
public V runMain(long timeoutInMillis) {
DefaultSolution defaultSolution = new DefaultSolution(queue);
GoodSolution goodSolution = new GoodSolution(queue);
BestSolution bestSolution = new BestSolution(queue);
// need an ExecutorService or something along those lines to run these in separate threads
defaultSolution.run();
goodSolution.run();
bestSolution.run();
int numberOfSolutions = 3;
return checkQueue(numberOfSolutions, timeoutInMillis);
}
private V checkQueue(int numberOfSolutions, long timeoutInMillis) {
V solution = null;
for(int i = 0; i < numberOfSolutions && timeoutInMillis > 0; i++) {
long startTime = System.currentTimeMillis();
V temp = queue.poll(timeoutInMillis, MILLISECONDS);
if(temp != null) {
// poll did not timeout
solution = temp;
} else {
// poll timed out
return solution;
}
// subtract time elapsed from timeout
timeoutInMillis -= (System.currentTimeMillis() - startTime);
}
return solution;
}
}
class DefaultSolution implements Runnable {
BlockingQueue<V> queue;
DefaultSolution(BlockingQueue<V> queueParam) {
queue = queueParam;
}
void run() {
V solution = // compute solution
// BlockingQueue will throw an exception if you try to add a null
if(solution != null) queue.offer(solution);
}
}
Your main class creates BlockingQueue<V>
and passes it as a parameter to three classes Solution
. Everyone Solution
puts their solution in the queue. The main method blocks the queue with a timeout (a method poll
that returns the head of the queue or null
, if it expires), then blocks again with a reduced timeout (the original timeout minus the time that has elapsed).
If you have a priori knowledge of what BestSolution
will always be better than GoodSolution
then you should change V
so that it indicates which one Solution
provided the solution - this is in case it BestSolution
somehow manages to compute its solution before GoodSolution
, in which case the given the above code will automatically overwrite the BestSolution's
earlier result with the subsequent result GoodSolution's
(for example, give DefaultSolution
priority 0, GoodSolution
priority 1, and BestSolution
priority 2, and checkQueue
overwrite Solution
only if the new solution has a higher priority than the old solution).
source to share
I think you can use three futures as Fabian suggested without waiting for a timeout if a better future is done sooner. Check out the class FallbackFuture
in the following separate example. It has the best result if available, and falls back to good or default otherwise.
public static void main(String[] args) {
try {
ExecutorService exec = Executors.newFixedThreadPool(8);
long tm = System.currentTimeMillis();
Future<String> better = future(exec, "better", 900);
Future<String> good = future(exec, "good", 600);
Future<String> defaulT = future(exec, "default", 300);
//FutureCompetition<String> f = new FutureCompetition<>(better, good, defaulT);
FallbackFuture<String> f = new FallbackFuture<>(better, good, defaulT);
int timeout = 5000;
System.out.println(f.get(timeout, TimeUnit.MILLISECONDS));
System.out.println("Took " + (System.currentTimeMillis() - tm) + "ms");
f.cancel(true);
exec.shutdown();
} catch (Exception ex) {
ex.printStackTrace(System.err);
}
}
public static class FallbackFuture<V> implements Future<V> {
private final Future<V> first;
private final Future<V>[] more;
@SafeVarargs
public FallbackFuture(Future<V> first, Future<V>... more) {
this.first = first;
this.more = more;
}
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
try {
return first.get(timeout, unit);
} catch (TimeoutException ex) {
for (Future<V> c : more) {
try {
return c.get(0, unit);
} catch (TimeoutException ex2) {
continue;
}
}
throw ex;
}
}
public boolean cancel(boolean mayInterruptIfRunning) {
boolean b = first.cancel(mayInterruptIfRunning);
for (Future<V> c : more)
b |= c.cancel(mayInterruptIfRunning);
return b;
}
@Override
public boolean isCancelled() {
for (Future<V> c : more)
if (!c.isCancelled()) return false;
return first.isCancelled();
}
@Override
public boolean isDone() {
for (Future<V> c : more)
if (!c.isDone()) return false;
return first.isDone();
}
@Override
public V get() throws InterruptedException, ExecutionException {
return first.get();
}
}
public static class FutureCompetition<V> {
private final Future<V> first;
private final Future<V>[] more;
@SafeVarargs
public FutureCompetition(Future<V> first, Future<V>... more) {
this.first = first;
this.more = more;
}
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
try {
return first.get(timeout, unit);
} catch (TimeoutException ex) {
for (Future<V> c : more) {
try {
return c.get(0, unit);
} catch (TimeoutException ex2) {
continue;
}
}
throw ex;
}
}
public boolean cancel(boolean mayInterruptIfRunning) {
boolean b = first.cancel(mayInterruptIfRunning);
for (Future<V> c : more)
b |= c.cancel(mayInterruptIfRunning);
return b;
}
}
static Future<String> future(ExecutorService exec, final String retVal, final int delayMillis) {
return exec.submit(new Callable<String>() {
@Override
public String call() throws Exception {
try {
Thread.sleep(delayMillis);
return retVal;
} catch (InterruptedException ex) {
System.err.println("interrupted " + retVal);
throw ex;
}
}
});
}
source to share