Collect results from a parallel stream

I have a code like this:

List<Egg> eggs = hens.parallelStream().map(hen -> {
    ArrayList<Egg> eggs = new ArrayList<>();
    while (hen.hasEgg()) {
        eggs.add(hen.getEgg());
    }
    return eggs;
}).flatMap(Collection::stream).collect(Collectors.toList());

      

But this way I have to create an ArrayList for each chicken, and the eggs are not collected until the chicken is 100% processed. I would like something like this:

List<Egg> eggs = hens.parallelStream().map(hen -> {
    while (hen.hasEgg()) {
        yield return hen.getEgg();
    }
}).collect(Collectors.toList());

      

But Java has no return on profitability. Is there a way to implement it?

+3


source to share


3 answers


Your class is Hen

poorly adapted to Stream API. If you cannot change it and it has no other useful methods (like Collection<Egg> getAllEggs()

or Iterator<Egg> eggIterator()

), you can create an egg stream like this:

public static Stream<Egg> eggs(Hen hen) {
    Iterator<Egg> it = new Iterator<Egg>() {
        @Override
        public boolean hasNext() {
            return hen.hasEgg();
        }

        @Override
        public Egg next() {
            return hen.getEgg();
        }
    };
    return StreamSupport.stream(Spliterators.spliteratorUnknownSize(it, 0), false);
}

      

Now you can use it like this:



List<Egg> eggs = hens.parallelStream()
                     .flatMap(hen -> eggs(hen))
                     .collect(Collectors.toList());

      

Of course a better Stream

implementation might be possible if you can change the class Hen

.

+8


source


The iterative logic using hasEgg()

and getEgg()

is stateful because the results of these methods depend on previous calls. Therefore, processing one Hen

cannot be parallelized unless you manage to completely change the interface.

However, you don't need to worry about ArrayList

. When a stream implementation performs a parallel operation collect

, it must buffer the values ​​for each stream anyway, and concatenate those buffers afterwards. Perhaps even if the operation will not be implemented at all in parallel execution.



What you can do is replace ArrayList

with Stream.Builder

as it is optimized to be used for pre-build only Stream

:

List<Egg> eggs = hens.parallelStream().flatMap(hen -> {
    Stream.Builder<Egg> eggStream = Stream.builder();
    while(hen.hasEgg()) {
        eggStream.add(hen.getEgg());
    }
    return eggStream.build();
}).collect(Collectors.toList());

      

+2


source


Assuming a method exists getEggs()

, you can use the following to collect all the eggs.

List<Egg> eggs = hens.parallelStream()
    .filter(Hen::hasEggs)
    .map(Hen::getEggs)
    .collect(ArrayList::new, ArrayList::addAll, ArrayList::addAll);

      

The code assumes it getEggs()

returns a Collection

. You can exclude filter(Hen::hasEggs)

if it getEggs()

returns empty Collection

when Hen

not Eggs

.

+1


source







All Articles