Collect results from a parallel stream
I have a code like this:
List<Egg> eggs = hens.parallelStream().map(hen -> {
ArrayList<Egg> eggs = new ArrayList<>();
while (hen.hasEgg()) {
eggs.add(hen.getEgg());
}
return eggs;
}).flatMap(Collection::stream).collect(Collectors.toList());
But this way I have to create an ArrayList for each chicken, and the eggs are not collected until the chicken is 100% processed. I would like something like this:
List<Egg> eggs = hens.parallelStream().map(hen -> {
while (hen.hasEgg()) {
yield return hen.getEgg();
}
}).collect(Collectors.toList());
But Java has no return on profitability. Is there a way to implement it?
source to share
Your class is Hen
poorly adapted to Stream API. If you cannot change it and it has no other useful methods (like Collection<Egg> getAllEggs()
or Iterator<Egg> eggIterator()
), you can create an egg stream like this:
public static Stream<Egg> eggs(Hen hen) {
Iterator<Egg> it = new Iterator<Egg>() {
@Override
public boolean hasNext() {
return hen.hasEgg();
}
@Override
public Egg next() {
return hen.getEgg();
}
};
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(it, 0), false);
}
Now you can use it like this:
List<Egg> eggs = hens.parallelStream()
.flatMap(hen -> eggs(hen))
.collect(Collectors.toList());
Of course a better Stream
implementation might be possible if you can change the class Hen
.
source to share
The iterative logic using hasEgg()
and getEgg()
is stateful because the results of these methods depend on previous calls. Therefore, processing one Hen
cannot be parallelized unless you manage to completely change the interface.
However, you don't need to worry about ArrayList
. When a stream implementation performs a parallel operation collect
, it must buffer the values for each stream anyway, and concatenate those buffers afterwards. Perhaps even if the operation will not be implemented at all in parallel execution.
What you can do is replace ArrayList
with Stream.Builder
as it is optimized to be used for pre-build only Stream
:
List<Egg> eggs = hens.parallelStream().flatMap(hen -> {
Stream.Builder<Egg> eggStream = Stream.builder();
while(hen.hasEgg()) {
eggStream.add(hen.getEgg());
}
return eggStream.build();
}).collect(Collectors.toList());
source to share
Assuming a method exists getEggs()
, you can use the following to collect all the eggs.
List<Egg> eggs = hens.parallelStream()
.filter(Hen::hasEggs)
.map(Hen::getEggs)
.collect(ArrayList::new, ArrayList::addAll, ArrayList::addAll);
The code assumes it getEggs()
returns a Collection
. You can exclude filter(Hen::hasEggs)
if it getEggs()
returns empty Collection
when Hen
not Eggs
.
source to share