Java parallel thread only uses one thread?

I am processing data using latest Java 8 lambdas with parallel threads. My code looks like this:

ForkJoinPool forkJoinPool = new ForkJoinPool(10);
List<String> files = Arrays.asList(new String[]{"1.txt"}); 
List<String> result = forkJoinPool.submit(() ->
    files.stream().parallel()
        .flatMap(x -> stage1(x)) //at this stage we add more elements to the stream
        .map(x -> stage2(x))
        .map(x -> stage3(x))
        .collect(Collectors.toList())
).get();

      

The flow starts with one item, but the second step adds more items. My guess is that this thread should run in parallel, but in this case only one worker thread is used.

If I start with 2 elements (i.e. add the second element to the original list), then 2 streams are created for the stream stream, etc. This also happens if I do not explicitly pipe the stream to ForkJoinPool.

Question: Is this a documented behavior or could it change in implementation? Is there a way to control this behavior and allow more threads regardless of the original list?

+3


source to share


2 answers


What you observe is implementation specific behavior, not specified behavior.

The current implementation of JDK 8 looks at an external thread Spliterator

and uses this as a framework for sharing a parallel workload. Since the example has only one element in the original source stream, it cannot be split and the stream is single-threaded. This works well for the general (but by no means limited) case where it flatMap

returns zero, one, or only a few items, but in the case where it returns many items, they are all processed sequentially. In fact, the stream returned by the function flatMap

is forced into sequential mode. See line 270 ReferencePipeline.java .



The "obvious" thing is to make this stream parallel, or at least not make it sequential. It may or may not improve the situation. This will most likely improve some things, but worse. Of course, better policy is needed here, but I'm not sure how it will look.

Also note that the method used to force a parallel thread to start in a fork-join pool of your choice by sending it the task that executes the pipeline is also implementation-specific behavior. It works this way in JDK 8, but may change in the future.

+3


source


You can try either LazyFutureStream or EagerFutureStream Stream implementation from simple-react . Both threads will create a CompletableFuture for each processor, each of which can be executed on a separate thread. This can lead to more efficient processing (depending on your actual usage and resources).

eg.

 LazyFutureStream.parallelBuilder(10)
                .of("1.txt")
                .flatMap(x -> stage1(x)) 
                .map(x -> stage2(x))
                .map(x -> stage3(x))
                .collect(Collectors.toList());

      



or

EagerFutureStream.parallelBuilder(10)
                .of("1.txt")
                .flatMap(x -> stage1(x)) 
                .map(x -> stage2(x))
                .map(x -> stage3(x))
                .collect(Collectors.toList());

      

+4


source







All Articles