Fixing the stream of streams
I found the hard way, that the JVM only uses one thread pool to process threads in parallel. We had a blocked I / O function on a large thread that was causing liveness issues for unrelated and otherwise fast functions used with unrelated parallel threads.
There are no methods on the thread that allow you to use an alternate thread pool.
Is there an easy way to avoid this problem, perhaps specifying somehow which thread pool to use?
source to share
You can complete the blocking operation ForkJoinPool.ManagedBlocker
according to this:
static <T> Supplier<T> blocking(Supplier<T> supplier) {
return new Supplier<T>() {
volatile T result;
@Override
public T get() {
try {
ForkJoinPool.managedBlock(new ManagedBlocker() {
@Override
public boolean block() {
result = supplier.get();
return true;
}
@Override
public boolean isReleasable() {
return result != null;
}
});
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
return result;
}
};
}
Use it for example as such:
Stream.generate(blocking(() -> ...))
.parallel()
...
.collect(...);
More information can be found on this blog: http://zeroturnaround.com/rebellabs/java-parallel-streams-are-bad-for-your-health/
jOOλ provides wrappers for all Java 8 types FunctionalInterface
as above through org.jooq.lambda.Blocking
, so you can write:
Stream.generate(Blocking.supplier(() -> ...))
.parallel()
...
.collect(...);
Or, for example, when a filter is blocked:
Stream....
.parallel()
.filter(Blocking.predicate(t -> blockingTest(t)))
.collect(...);
(Disclaimer, I work for a company for jOOλ).
source to share
I wrote a small library called StreamEx that can send a task to a custom FJP. So you can write
ForkJoinPool forkJoinPool = new ForkJoinPool(2);
int[] primes = IntStreamEx.range(1, 1_000_000)
.parallel(forkJoinPool)
.filter(PrimesPrint::isPrime).toArray();
It just remembers your pool and starts a terminal operation inside it, concatenating the result. Just syntactic sugar for the above solution.
source to share
Maybe it looks like Custom thread pool in Java 8 parallel thread
The problem is discussed in this blog .
ForkJoinPool forkJoinPool = new ForkJoinPool(2);
forkJoinPool.submit(() ->
//parallel task here, for example
range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList())
).get();
source to share