Fixing the stream of streams

I found the hard way, that the JVM only uses one thread pool to process threads in parallel. We had a blocked I / O function on a large thread that was causing liveness issues for unrelated and otherwise fast functions used with unrelated parallel threads.

There are no methods on the thread that allow you to use an alternate thread pool.

Is there an easy way to avoid this problem, perhaps specifying somehow which thread pool to use?

+3


source to share


3 answers


You can complete the blocking operation ForkJoinPool.ManagedBlocker

according to this:

static <T> Supplier<T> blocking(Supplier<T> supplier) {
    return new Supplier<T>() {
        volatile T result;

        @Override
        public T get() {
            try {
                ForkJoinPool.managedBlock(new ManagedBlocker() {
                    @Override
                    public boolean block() {
                        result = supplier.get();
                        return true;
                    }

                    @Override
                    public boolean isReleasable() {
                        return result != null;
                    }
                });
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            return result;
        }
    };
}

      

Use it for example as such:

Stream.generate(blocking(() -> ...))
      .parallel()
      ...
      .collect(...);

      

More information can be found on this blog: http://zeroturnaround.com/rebellabs/java-parallel-streams-are-bad-for-your-health/



jOOλ provides wrappers for all Java 8 types FunctionalInterface

as above through org.jooq.lambda.Blocking

, so you can write:

Stream.generate(Blocking.supplier(() -> ...))
      .parallel()
      ...
      .collect(...);

      

Or, for example, when a filter is blocked:

Stream....
      .parallel()
      .filter(Blocking.predicate(t -> blockingTest(t)))
      .collect(...);

      

(Disclaimer, I work for a company for jOOλ).

+2


source


I wrote a small library called StreamEx that can send a task to a custom FJP. So you can write

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
int[] primes = IntStreamEx.range(1, 1_000_000)
    .parallel(forkJoinPool)
    .filter(PrimesPrint::isPrime).toArray();

      



It just remembers your pool and starts a terminal operation inside it, concatenating the result. Just syntactic sugar for the above solution.

+1


source


Maybe it looks like Custom thread pool in Java 8 parallel thread

The problem is discussed in this blog .

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
forkJoinPool.submit(() ->
    //parallel task here, for example
    range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList())
).get();

      

0


source







All Articles