By default, ForkJoinPool executor takes a long time

I am working with CompletableFuture to asynchronously execute a stream generated from a list source.

so I am testing an overloaded method, ie "supplyAsync" CompletableFuture, where one method only takes one supplier parameter and the other takes a supplier parameter and an executor parameter. Here's the documentation for both:

one

supplyAsync (Supplier Supplier)

Returns a new CompletableFuture that is asynchronously completed by the task started in ForkJoinPool.commonPool () with the value returned by the call to this Provider.

second

supplyAsync (Vendor Supplier, Executor Executor)

Returns a new CompletableFuture that is completed asynchronously by the job running in this executor, with the value returned by calling this Provider.

And here is my test class:

public class TestCompleteableAndParallelStream {

    public static void main(String[] args) {
        List<MyTask> tasks = IntStream.range(0, 10)
                .mapToObj(i -> new MyTask(1))
                .collect(Collectors.toList());

        useCompletableFuture(tasks);

        useCompletableFutureWithExecutor(tasks);

    }

    public static void useCompletableFutureWithExecutor(List<MyTask> tasks) {
          long start = System.nanoTime();
          ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), 10));
          List<CompletableFuture<Integer>> futures =
              tasks.stream()
                   .map(t -> CompletableFuture.supplyAsync(() -> t.calculate(), executor))
                   .collect(Collectors.toList());

          List<Integer> result =
              futures.stream()
                     .map(CompletableFuture::join)
                     .collect(Collectors.toList());
          long duration = (System.nanoTime() - start) / 1_000_000;
          System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
          System.out.println(result);
          executor.shutdown();
        }

    public static void useCompletableFuture(List<MyTask> tasks) {
          long start = System.nanoTime();
          List<CompletableFuture<Integer>> futures =
              tasks.stream()
                   .map(t -> CompletableFuture.supplyAsync(() -> t.calculate()))
                   .collect(Collectors.toList());

          List<Integer> result =
              futures.stream()
                     .map(CompletableFuture::join)
                     .collect(Collectors.toList());
          long duration = (System.nanoTime() - start) / 1_000_000;
          System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
          System.out.println(result);
        }



}


class MyTask {
      private final int duration;
      public MyTask(int duration) {
        this.duration = duration;
      }
      public int calculate() {
        System.out.println(Thread.currentThread().getName());
        try {
          Thread.sleep(duration * 1000);
        } catch (final InterruptedException e) {
          throw new RuntimeException(e);
        }
        return duration;
      }
    }

      

while the useCompletableFuture method takes about 4 seconds, the "useCompletableFutureWithExecutor" method takes only 1 second.

Not my question: What other processing does ForkJoinPool.commonPool () do that might cause overhead? In case we don't always prefer a custom executor pool over ForkJoinPool?

+3


source to share


2 answers


Check the size ForkJoinPool.commonPool()

. By default, a pool is created with the size

Runtime.getRuntime().availableProcessors() - 1

      

I am running your example on my Intel i7-4800MQ (4 cores + 4 vcores) and the size of the shared pool in my case is 7

, so the whole calculation was ~ 2000ms:

ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-4
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-6
ForkJoinPool.commonPool-worker-5
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-7
ForkJoinPool.commonPool-worker-4
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-1
Processed 10 tasks in 2005 millis
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

      

In the second case, you used

Executors.newFixedThreadPool(Math.min(tasks.size(), 10));

      

so that the pool has 10 threads ready to perform computations, so all tasks are done in ~ 1000ms:

pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
pool-1-thread-4
pool-1-thread-5
pool-1-thread-6
pool-1-thread-7
pool-1-thread-8
pool-1-thread-9
pool-1-thread-10
Processed 10 tasks in 1002 millis
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

      

Difference between ForkJoinPool

andExecutorService



Eugene mentions another important thing in his comment. ForkJoinPool

uses a theft approach:

A ForkJoinPool differs from other kinds of ExecutorService mainly due to its use of work theft: all threads in the pool try to find and execute tasks submitted to the pool and / or created by other active tasks (ultimately blocking waiting for work if they don't exist). This allows for efficient handling when most of the tasks spawn other subtasks (like most ForkJoinTasks), as well as when many small tasks are submitted to the pool from external clients. Especially when setting asyncMode to true in constructors, ForkJoinPools can also be suitable for use with event-style tasks that never connect.

while ExecutorService

created with .newFixedThreadPool()

takes a split-and-conquer approach.

How do I determine the size of the pool?

There was a question about what is the best thread pool size, you can find some useful information there:

Setting the ideal thread pool size

Also this thread is a good place to explore:

Custom thread pool in Java 8 parallel thread

+2


source


Further checking the solutions online, I found that we can change the default pool size that ForkJoinPool uses using the following properties:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=16

      



Thus, this property can help in better use of ForkJoinPool and more parallelism.

+2


source







All Articles