Variable callers with different return types

The problem I have in hand is that I have methods one(), two(), three(), four()

that have different types of return words, say A, B, C, D

, and I need to spawn variable numbers of threads (one for each method depending on the use case. Would like to call a subset of the methods at the same time.) I now use cachedThreadPool

to send these calls. Below is the code:

public class Dispatcher {

  public void dispatch(List<MethodNames> methodNames) {
    //Now I am going to iterate through the list of methodNames
    //And submit each method to the `ExecutorService`
    for(MethodNames m : methodNames) {
      switch(m) {
        case ONE: //submit one()
                  //wait and get the future
                  Future<A> future = cachePool.submit(new Callable<A>() {
                    @Override
                    public A call() {
                      return one();
                    });
                  A response = future.get(); 
             break;
        ....
      }
    }
  }
}

public enum MethodNames {
  ONE, TWO, THREE, FOUR
}

//Example methods:
public A one() {
}

public B two() {
}

      

My question is how to do the above so that all method calls are made without having to wait for completion. Also, how can I collect everything futures

and wait for it to complete so that all futures have a different generic type Future<A>, Future<B>

, etc. I'm making the call submit()

inside a case statement, so I don't have access to the returned Future<T>

outside of the case. Now I could do if, else

a loop instead for

, but I'm trying to figure out if there is a better way to achieve this.

+3


source to share


6 answers


I would do it like this -

  • Create an interface, say I

    .
  • Let classes A

    , B

    , C

    and D

    implement I

    .
  • Use enums valueOf

    and object overriding

    to remove case case.
  • Use polymorphism and return I

    from all methods.
  • Here is the code (not including A

    , B

    , C

    , D

    , I

    ), as they are a simple class and an interface - not a lot.

Below is the code:

Dispatcher.java

package com.test.thread;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;

public class Dispatcher {

public void dispatch() throws InterruptedException, ExecutionException {
    Map<MethodNames, Future<I>> reponse = new HashMap<MethodNames, Future<I>>();
    ExecutorService cachePool = Executors.newCachedThreadPool();
    for (MethodNames methodNames : MethodNames.values()) {
        Future<I> future = cachePool.submit(methodNames.worker());
        reponse.put(methodNames, future);
    }
    cachePool.awaitTermination(5, TimeUnit.MINUTES);
    for(MethodNames key : reponse.keySet()) {
        I result = reponse.get(key).get();
        System.out.println("result :: " + result);
    }
}

public static void main(String[] args) throws InterruptedException, ExecutionException {
    new Dispatcher().dispatch();
}

      



}

MethodNames.java

package com.test.thread;

import java.util.concurrent.*;

public enum MethodNames {
ONE {
    @Override
    public Callable<I> worker() {
        return new Callable<I>() {
            @Override
            public I call() throws InterruptedException {
                System.out.println("Thread1");
                TimeUnit.SECONDS.sleep(30);
              return new A();
            }};
    }
},
TWO {
    @Override
    public Callable<I> worker() throws InterruptedException {
        return new Callable<I>() {
            @Override
            public I call() throws InterruptedException {
                System.out.println("Thread2");
                TimeUnit.SECONDS.sleep(30);
              return new B();
            }};
    }
},
THREE {
    @Override
    public Callable<I> worker() throws InterruptedException {
        return new Callable<I>() {
            @Override
            public I call() throws InterruptedException {
                System.out.println("Thread3");
                TimeUnit.SECONDS.sleep(30);
              return new C();
            }};
    }
},
FOUR {
    @Override
    public Callable<I> worker() throws InterruptedException {
        return new Callable<I>() {
            @Override
            public I call() throws InterruptedException {
                System.out.println("Thread");
                TimeUnit.SECONDS.sleep(30);
              return new D();
            }};
    }
};
public abstract Callable<I> worker() throws InterruptedException;

      

}

+2


source


It seems better to separate get from future, so add Callable as parameter to enum. Then the instant can create the Future. Unfortunately, for typical typing, the required class must be saved and used for correct printing. In the meantime, there is no need to know about it. ”



public enum MethodNames {
    ONE(A.class, () -> { one() }),
    TWO(B.class, () -> { two() }),
    ...
    FOUR(D.class, () -> { four() });

    private final Class<?> resultType;
    private final Future<?> future;
    private <T> MethodNames(Class<T> resultType, Callable<T> callable) {
        this.resultType = resultType;
        future = cachePool.submit(callable);
    }

    public <T> T getResponse(Class<T> type) {
        Object response = future.get();
        return resultType.asSubclass(type).cast(response);
    }
}

      

+1


source


If it's only the Callables

one you are posting to ExecutorService

, you can call awaitTermination on cachePool

after submitting your assignments (it might be Runnable

instead Callable

)

public class Dispatcher {
  public void dispatch(List<MethodNames> methodNames) {
    for(MethodNames m : methodNames) {
      switch(m) {
        case ONE: //submit one()
                  cachePool.execute(new Runnable() {
                    @Override
                    public void run() {
                      // do work
                    });
             break;
        ....
      }
    }
  }
  cachePool.awaitTermination(100, TimeUnit.HOURS);
}

      

If cachePool

there are other unrelated tasks in, or for some other reason you cannot use awaitTermination

, then you can block the Semaphore . Initialize Semaphore

with zero permissions, each task will release

resolve when it completes, and the method dispatch

locks on semaphore.acquire(methodNames.size())

, which waits until all tasks have been called release

(and therefore completed). Pay attention to the block try-finally

in Runnable

, otherwise, if it Runnable

throws an exception, it will not call release

, and the method dispatch

will block forever.

public class Dispatcher {
  public void dispatch(List<MethodNames> methodNames) {
    Semaphore semaphore = new Semaphore(0);
    for(MethodNames m : methodNames) {
      switch(m) {
        case ONE: //submit one()
                  cachePool.execute(new Runnable() {
                    @Override
                    public void run() {
                      try {
                        // do work
                      } finally {
                        semaphore.release();
                      }
                    });
             break;
        ....
      }
    }
  }
  semaphore.acquire(methodNames.size());
}

      

If you are collecting the results of tasks (this does not seem like what you are doing at the moment, but the requirements tend to change), then each Runnable

can store their result in a common ConcurrentLinkedQueue or some other thread-safe data structure (or one data structure for of each return type, etc.) and then dispatch

can handle those results when semaphore.acquire

or is awaitTermination

unlocked.

+1


source


Ingredients

You need to approach the result in several steps:

Waiting for multiple futures

In this case, you can just use Future<?>

in the signature, because just to wait, you don't need to know about the type of the result. This way you can just create a method void waitForAll(List< Future<?> > futures)

.

Getting a safe result from an unknown future

To do this, you need some kind of descriptor that knows about the type Future

. Due to Java type erasure, this descriptor has to be preserved somehow Class<T>

. So the simplest descriptor would be the corresponding one Future<T>

(c T

being one of A

and B

in your example).

So, you can just store the futures in Map<Class<?>, Future<?>)

(or a MultiMap

) with an additional type get method Future<T> get<T>(Class<T> handle)

.

You can replace your enum MethodNames

with this descriptor.

Receipt: combining ingredients into solution

  • Create Map

    / MultiMap

    as defined above eg. by refusing multiple calls to your method dispatch

    .
  • Use waitAll with a list of map values
  • Retrieve the relevant results from Map

    / MultiMap

    using the get method described above.
0


source


You are trying to do something like fork-join or map-reduce. You can find an established mechanism for this, instead of reinventing the wheel.

Anyway, back to your specific problem waiting for all methods to complete and move:

As you mentioned, you shouldn't lose track of the future. Therefore, create a structure Result that can contain all futures. waiting on Result in the current thread. Start another thread inside Result that will track the futures and notify when all methods return.

When the Result notifies, you move forward in the current thread with all of the returned data stored in the Result object .

0


source


Simple (limited) solution : this is really useful if you can define an interface / superclass for return values ​​(similarly sql.ResultSet

), otherwise not much ... then the switch reappears when working with the results because you have to use it.

Sending

dispatcher.dispatch(new SuperABC[] {new A(), new B(), new C()});
// dispatcher.dispatch(new A(), new B(), new C()); with ... notation, see comment below

      

Interfaces:

public interface Result { ... }
public interface SuperABC extends Callable<Result> {}

      

Example class:

public class A implements SuperABC {
    public Result call() throws Exception { ... }
}

      

method of sending:

public Result[] dispatch(SuperABC[] tasks) { // dispatch(SuperABC... tasks)
    List<Future<Result>> refs = new ArrayList<Future<Result>>(tasks.length);
    Result[] ret = new Result[tasks.length];
    for (SuperABC task : tasks)
        refs.add(cachedThreadPool.submit(task));
    for (int i = 0; i < tasks.length; i++)
        ret[i] = refs.get(i).get();
    return ret;
}

      

0


source







All Articles