In Java, how do I determine the future <V> that returns the best out of multiple answers?

I need a type value V

to be calculated asynchronously. Unfortunately, the best answer may take too long, so I have a few other values ​​to take as a last resort. What I would like to do is define Future<V>

which one I can call with a timeout and return it with the best answer available at the time it was. Something like:

Future<V> theValue = // something involving executor.submit()
    // inside here, asynchronous calls to find
    V default = // something pretty quick to return
    V good = // something that will take longer and might not return in time
    V better = // something that will take longest but is the best answer

V v = theValue.get(5, TimeUnit.SECONDS); // now v should contain one of default,
    // good, or better, preferring them in the expected order

      

I'm sure this is a fairly common model, but I haven't been able to find a good example. Any help?

+3


source to share


3 answers


In the proposed scenario, there are three different computation options (I'll call them good, best, and best). Each successive version produces a result that is preferred over the previous version, but due to the increased complexity it may take a correspondingly longer period of time to complete. There is a certain amount of time that the caller is willing to wait for some result, and in this answer I will use 5 (5) seconds for this cutoff time.

It is possible to maintain this strict order of preference by avoiding loops and queues by using a pair of latches for each batch of related operations.


Primary logic

ExecutorService execService = // ... e.g. new ThreadPoolExecutor(...)

<T> T compute(
    Callable<? extends T> good,
    Callable<? extends T> better,
    Callable<? extends T> best) {

    RelatedCallables calls = new RelatedCallables(); // new for each batch
    Future<T> bestResult = execService.submit(calls.wrap(best)); // first wrapped is primary
    Future<T> betterResult = execService.submit(calls.wrap(better));
    Future<T> goodResult = execService.submit(calls.wrap(good));

    try {
        if (!calls.awaitPrimary(5, TimeUnit.SECONDS)) {
            calls.awaitAny(); // waits indefinitely, unless THIS thread interrupted
        }
        // reaching here means at least one of them has a result
        if (bestResult.isDone()) return bestResult.get();
        if (betterResult.isDone()) return betterResult.get();
        return goodResult.get();
    }
    catch (ExecutionException failedExecution) {
        // TODO: handling this is left as an exercise for the reader
        return null;
    }
    catch (InterruptedException interrupted) {
        // TODO: handling this is left as an exercise for the reader
        return null;
    }
    finally {
        boolean sendInterrupt = true; // or false, depending on your needs
        goodResult.cancel(sendInterrupt);
        betterResult.cancel(sendInterrupt);
        bestResult.cancel(sendInterrupt);
    }
}

      

This solution uses a helper class RelatedCallables

(more details below) to communicate between three computations, represented as instances Callable

. The helper class will wrap each instance and the wrapper is presented in ExecutorService

for parallel execution. In this implementation, it is important that the best one is Callable

wrapped first. The order in which the other instances are wrapped and dispatched is not important.

The combination of methods awaitPrimary

and awaitAny

helper RelatedCallables

combined with conditional if

sets our wait and timeout policy. If the best (primary) result is available within the specified timeout, it skips the contents of the block if

and jumps directly to the returned results so that the caller does not have to wait five seconds.

If awaitPrimary

expires before the best evaluation completes, it enters the true conditional branch and waits indefinitely for any evaluation to complete. It is usually expected (but I am not suggesting) that at least one of the other computations will complete within the main timeout; if yes, awaitAny

will return immediately. Instead of waiting indefinitely for one of the three computations, it is also possible to return a null

specified value or throw an exception with some relatively minor modifications.



Once the program thread traverses the conditional block, it checks each one Future

in order of preference and returns a value from the first, which indicates that it has completed. Also note that the block finally

tries to cancel any pending calculations.


Helper class (internal)

static class RelatedCallables {
    private final CountDownLatch primaryLatch = new CountDownLatch(1);
    private final CountDownLatch anyLatch = new CountDownLatch(1);
    private boolean hasPrimary;

    void awaitAny() throws InterruptedException {
        anyLatch.await();
    }

    boolean awaitPrimary(long timeout, TimeUnit unit) throws InterruptedException {
        return primaryLatch.await(timeout, unit);
    }

    <T> Callable<T> wrap(final Callable<? extends T> original) {
        final boolean isPrimary = !hasPrimary;
        hasPrimary = true;

        return new Callable<T>() {
            @Override
            public T call() throws Exception {
                try {
                    return original.call();
                }
                finally {
                    anyLatch.countDown();
                    if (isPrimary) primaryLatch.countDown();
                }
            }
        };
    }
}

      

It is a relatively simple wrapper Callable

that binds to a wrapped instance. Its main purpose is to reduce the number of delays anyLatch

after the wrapped instance completes. This way, it awaitAny

will know if any of the cover wrapped by this helper is complete.

There is a second latch, used only with the first Callable

one presented in wrap

, that logically distinguishes the primary (or better) instance. The wrapper for this instance shrinks this individual latch so that it awaitPrimary

can fail quickly if the best computation completes before the cutoff time expires.

Since CountDownLatch

it cannot be reused, each separate batch of operations Callable

requires a new pair of latches. This implementation accomplishes this by creating a new instance RelatedCallables

for each batch.

+1


source


I suggest you use BlockingQueue to store the results

class MainClass {
  BlockingQueue<V> queue = new LinkedBlockingQueue();

  public V runMain(long timeoutInMillis) {
    DefaultSolution defaultSolution = new DefaultSolution(queue);
    GoodSolution goodSolution = new GoodSolution(queue);
    BestSolution bestSolution = new BestSolution(queue);

    // need an ExecutorService or something along those lines to run these in separate threads
    defaultSolution.run();
    goodSolution.run();
    bestSolution.run();

    int numberOfSolutions = 3;
    return checkQueue(numberOfSolutions, timeoutInMillis);
  }

  private V checkQueue(int numberOfSolutions, long timeoutInMillis) {
    V solution = null;
    for(int i = 0; i < numberOfSolutions && timeoutInMillis > 0; i++) {
      long startTime = System.currentTimeMillis();
      V temp = queue.poll(timeoutInMillis, MILLISECONDS);
      if(temp != null) {
        // poll did not timeout
        solution = temp;
      } else {
        // poll timed out
        return solution;
      }
      // subtract time elapsed from timeout
      timeoutInMillis -= (System.currentTimeMillis() - startTime);
    }
    return solution;
  }
}

class DefaultSolution implements Runnable {
  BlockingQueue<V> queue;

  DefaultSolution(BlockingQueue<V> queueParam) {
    queue = queueParam;
  }

  void run() {
    V solution = // compute solution
    // BlockingQueue will throw an exception if you try to add a null
    if(solution != null) queue.offer(solution);
  }
}

      



Your main class creates BlockingQueue<V>

and passes it as a parameter to three classes Solution

. Everyone Solution

puts their solution in the queue. The main method blocks the queue with a timeout (a method poll

that returns the head of the queue or null

, if it expires), then blocks again with a reduced timeout (the original timeout minus the time that has elapsed).

If you have a priori knowledge of what BestSolution

will always be better than GoodSolution

then you should change V

so that it indicates which one Solution

provided the solution - this is in case it BestSolution

somehow manages to compute its solution before GoodSolution

, in which case the given the above code will automatically overwrite the BestSolution's

earlier result with the subsequent result GoodSolution's

(for example, give DefaultSolution

priority 0, GoodSolution

priority 1, and BestSolution

priority 2, and checkQueue

overwrite Solution

only if the new solution has a higher priority than the old solution).

0


source


I think you can use three futures as Fabian suggested without waiting for a timeout if a better future is done sooner. Check out the class FallbackFuture

in the following separate example. It has the best result if available, and falls back to good or default otherwise.

public static void main(String[] args) {
    try {
        ExecutorService exec = Executors.newFixedThreadPool(8);
        long tm = System.currentTimeMillis();
        Future<String> better = future(exec, "better", 900);
        Future<String> good = future(exec, "good", 600);
        Future<String> defaulT = future(exec, "default", 300);
        //FutureCompetition<String> f = new FutureCompetition<>(better, good, defaulT);
        FallbackFuture<String> f = new FallbackFuture<>(better, good, defaulT);
        int timeout = 5000;
        System.out.println(f.get(timeout, TimeUnit.MILLISECONDS));
        System.out.println("Took " + (System.currentTimeMillis() - tm) + "ms");
        f.cancel(true);
        exec.shutdown();
    } catch (Exception ex) {
        ex.printStackTrace(System.err);
    }
}

public static class FallbackFuture<V> implements Future<V> {
    private final Future<V> first;
    private final Future<V>[] more;

    @SafeVarargs
    public FallbackFuture(Future<V> first, Future<V>... more) {
        this.first = first;
        this.more = more;
    }

    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        try {
            return first.get(timeout, unit);
        } catch (TimeoutException ex) {
            for (Future<V> c : more) {
                try {
                    return c.get(0, unit);
                } catch (TimeoutException ex2) {
                    continue;
                }
            }
            throw ex;
        }
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        boolean b = first.cancel(mayInterruptIfRunning);
        for (Future<V> c : more)
            b |= c.cancel(mayInterruptIfRunning);
        return b;
    }

    @Override
    public boolean isCancelled() {
        for (Future<V> c : more)
            if (!c.isCancelled()) return false;
        return first.isCancelled();
    }

    @Override
    public boolean isDone() {
        for (Future<V> c : more)
            if (!c.isDone()) return false;
        return first.isDone();
    }

    @Override
    public V get() throws InterruptedException, ExecutionException {
        return first.get();
    }
}

public static class FutureCompetition<V> {
    private final Future<V> first;
    private final Future<V>[] more;

    @SafeVarargs
    public FutureCompetition(Future<V> first, Future<V>... more) {
        this.first = first;
        this.more = more;
    }

    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        try {
            return first.get(timeout, unit);
        } catch (TimeoutException ex) {
            for (Future<V> c : more) {
                try {
                    return c.get(0, unit);
                } catch (TimeoutException ex2) {
                    continue;
                }
            }
            throw ex;
        }
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        boolean b = first.cancel(mayInterruptIfRunning);
        for (Future<V> c : more)
            b |= c.cancel(mayInterruptIfRunning);
        return b;
    }
}

static Future<String> future(ExecutorService exec, final String retVal, final int delayMillis) {
    return exec.submit(new Callable<String>() {
        @Override
        public String call() throws Exception {
            try {
                Thread.sleep(delayMillis);
                return retVal;
            } catch (InterruptedException ex) {
                System.err.println("interrupted " + retVal);
                throw ex;
            }
        }
    });
}

      

0


source







All Articles