Variable callers with different return types
The problem I have in hand is that I have methods one(), two(), three(), four()
that have different types of return words, say A, B, C, D
, and I need to spawn variable numbers of threads (one for each method depending on the use case. Would like to call a subset of the methods at the same time.) I now use cachedThreadPool
to send these calls. Below is the code:
public class Dispatcher {
public void dispatch(List<MethodNames> methodNames) {
//Now I am going to iterate through the list of methodNames
//And submit each method to the `ExecutorService`
for(MethodNames m : methodNames) {
switch(m) {
case ONE: //submit one()
//wait and get the future
Future<A> future = cachePool.submit(new Callable<A>() {
@Override
public A call() {
return one();
});
A response = future.get();
break;
....
}
}
}
}
public enum MethodNames {
ONE, TWO, THREE, FOUR
}
//Example methods:
public A one() {
}
public B two() {
}
My question is how to do the above so that all method calls are made without having to wait for completion. Also, how can I collect everything futures
and wait for it to complete so that all futures have a different generic type Future<A>, Future<B>
, etc. I'm making the call submit()
inside a case statement, so I don't have access to the returned Future<T>
outside of the case. Now I could do if, else
a loop instead for
, but I'm trying to figure out if there is a better way to achieve this.
source to share
I would do it like this -
- Create an interface, say
I
. - Let classes
A
,B
,C
andD
implementI
. - Use enums
valueOf
and objectoverriding
to remove case case. - Use polymorphism and return
I
from all methods. - Here is the code (not including
A
,B
,C
,D
,I
), as they are a simple class and an interface - not a lot.
Below is the code:
Dispatcher.java
package com.test.thread;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
public class Dispatcher {
public void dispatch() throws InterruptedException, ExecutionException {
Map<MethodNames, Future<I>> reponse = new HashMap<MethodNames, Future<I>>();
ExecutorService cachePool = Executors.newCachedThreadPool();
for (MethodNames methodNames : MethodNames.values()) {
Future<I> future = cachePool.submit(methodNames.worker());
reponse.put(methodNames, future);
}
cachePool.awaitTermination(5, TimeUnit.MINUTES);
for(MethodNames key : reponse.keySet()) {
I result = reponse.get(key).get();
System.out.println("result :: " + result);
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
new Dispatcher().dispatch();
}
}
MethodNames.java
package com.test.thread;
import java.util.concurrent.*;
public enum MethodNames {
ONE {
@Override
public Callable<I> worker() {
return new Callable<I>() {
@Override
public I call() throws InterruptedException {
System.out.println("Thread1");
TimeUnit.SECONDS.sleep(30);
return new A();
}};
}
},
TWO {
@Override
public Callable<I> worker() throws InterruptedException {
return new Callable<I>() {
@Override
public I call() throws InterruptedException {
System.out.println("Thread2");
TimeUnit.SECONDS.sleep(30);
return new B();
}};
}
},
THREE {
@Override
public Callable<I> worker() throws InterruptedException {
return new Callable<I>() {
@Override
public I call() throws InterruptedException {
System.out.println("Thread3");
TimeUnit.SECONDS.sleep(30);
return new C();
}};
}
},
FOUR {
@Override
public Callable<I> worker() throws InterruptedException {
return new Callable<I>() {
@Override
public I call() throws InterruptedException {
System.out.println("Thread");
TimeUnit.SECONDS.sleep(30);
return new D();
}};
}
};
public abstract Callable<I> worker() throws InterruptedException;
}
source to share
It seems better to separate get from future, so add Callable as parameter to enum. Then the instant can create the Future. Unfortunately, for typical typing, the required class must be saved and used for correct printing. In the meantime, there is no need to know about it. ”
public enum MethodNames {
ONE(A.class, () -> { one() }),
TWO(B.class, () -> { two() }),
...
FOUR(D.class, () -> { four() });
private final Class<?> resultType;
private final Future<?> future;
private <T> MethodNames(Class<T> resultType, Callable<T> callable) {
this.resultType = resultType;
future = cachePool.submit(callable);
}
public <T> T getResponse(Class<T> type) {
Object response = future.get();
return resultType.asSubclass(type).cast(response);
}
}
source to share
If it's only the Callables
one you are posting to ExecutorService
, you can call awaitTermination on cachePool
after submitting your assignments (it might be Runnable
instead Callable
)
public class Dispatcher {
public void dispatch(List<MethodNames> methodNames) {
for(MethodNames m : methodNames) {
switch(m) {
case ONE: //submit one()
cachePool.execute(new Runnable() {
@Override
public void run() {
// do work
});
break;
....
}
}
}
cachePool.awaitTermination(100, TimeUnit.HOURS);
}
If cachePool
there are other unrelated tasks in, or for some other reason you cannot use awaitTermination
, then you can block the Semaphore . Initialize Semaphore
with zero permissions, each task will release
resolve when it completes, and the method dispatch
locks on semaphore.acquire(methodNames.size())
, which waits until all tasks have been called release
(and therefore completed). Pay attention to the block try-finally
in Runnable
, otherwise, if it Runnable
throws an exception, it will not call release
, and the method dispatch
will block forever.
public class Dispatcher {
public void dispatch(List<MethodNames> methodNames) {
Semaphore semaphore = new Semaphore(0);
for(MethodNames m : methodNames) {
switch(m) {
case ONE: //submit one()
cachePool.execute(new Runnable() {
@Override
public void run() {
try {
// do work
} finally {
semaphore.release();
}
});
break;
....
}
}
}
semaphore.acquire(methodNames.size());
}
If you are collecting the results of tasks (this does not seem like what you are doing at the moment, but the requirements tend to change), then each Runnable
can store their result in a common ConcurrentLinkedQueue or some other thread-safe data structure (or one data structure for of each return type, etc.) and then dispatch
can handle those results when semaphore.acquire
or is awaitTermination
unlocked.
source to share
Ingredients
You need to approach the result in several steps:
Waiting for multiple futures
In this case, you can just use Future<?>
in the signature, because just to wait, you don't need to know about the type of the result. This way you can just create a method void waitForAll(List< Future<?> > futures)
.
Getting a safe result from an unknown future
To do this, you need some kind of descriptor that knows about the type Future
. Due to Java type erasure, this descriptor has to be preserved somehow Class<T>
. So the simplest descriptor would be the corresponding one Future<T>
(c T
being one of A
and B
in your example).
So, you can just store the futures in Map<Class<?>, Future<?>)
(or a MultiMap
) with an additional type get method Future<T> get<T>(Class<T> handle)
.
You can replace your enum MethodNames
with this descriptor.
Receipt: combining ingredients into solution
- Create
Map
/MultiMap
as defined above eg. by refusing multiple calls to your methoddispatch
. - Use waitAll with a list of map values
- Retrieve the relevant results from
Map
/MultiMap
using the get method described above.
source to share
You are trying to do something like fork-join or map-reduce. You can find an established mechanism for this, instead of reinventing the wheel.
Anyway, back to your specific problem waiting for all methods to complete and move:
As you mentioned, you shouldn't lose track of the future. Therefore, create a structure Result that can contain all futures. waiting on Result in the current thread. Start another thread inside Result that will track the futures and notify when all methods return.
When the Result notifies, you move forward in the current thread with all of the returned data stored in the Result object .
source to share
Simple (limited) solution : this is really useful if you can define an interface / superclass for return values (similarly sql.ResultSet
), otherwise not much ... then the switch reappears when working with the results because you have to use it.
Sending
dispatcher.dispatch(new SuperABC[] {new A(), new B(), new C()});
// dispatcher.dispatch(new A(), new B(), new C()); with ... notation, see comment below
Interfaces:
public interface Result { ... }
public interface SuperABC extends Callable<Result> {}
Example class:
public class A implements SuperABC {
public Result call() throws Exception { ... }
}
method of sending:
public Result[] dispatch(SuperABC[] tasks) { // dispatch(SuperABC... tasks)
List<Future<Result>> refs = new ArrayList<Future<Result>>(tasks.length);
Result[] ret = new Result[tasks.length];
for (SuperABC task : tasks)
refs.add(cachedThreadPool.submit(task));
for (int i = 0; i < tasks.length; i++)
ret[i] = refs.get(i).get();
return ret;
}
source to share