Java Collector.combiner gets called with provider values โโalways
Problem: Create a Collector implementation that will multiply a stream of integers in parallel and return Long.
Implementation:
public class ParallelMultiplier implements Collector<Integer, Long, Long> {
@Override
public BiConsumer<Long, Integer> accumulator() {
// TODO Auto-generated method stub
return (operand1, operand2) -> {
System.out.println("Accumulating Values (Accumulator, Element): (" + operand1 + ", " + operand2 + ")");
long Lval = operand1.longValue();
int Ival = operand2.intValue();
Lval *= Ival;
operand1 = Long.valueOf(Lval);
System.out.println("Acc Updated : " + operand1);
};
}
@Override
public Set<java.util.stream.Collector.Characteristics> characteristics() {
// TODO Auto-generated method stub
return Collections.unmodifiableSet(EnumSet.of(UNORDERED));
}
@Override
public BinaryOperator<Long> combiner() {
return (operand1, operand2) -> {
System.out.println("Combining Values : (" + operand1 + ", " + operand2 + ")");
long Lval1 = operand1.longValue();
long Lval2 = operand2.longValue();
Lval1 *= Lval2;
return Long.valueOf(Lval1);
};
}
@Override
public Function<Long, Long> finisher() {
// TODO Auto-generated method stub
return Function.identity();
}
@Override
public Supplier<Long> supplier() {
return () -> new Long(1L);
}
}
Observed output:
Accumulating Values (Accumulator, Element): (1, 4)
Acc Updated : 4
Accumulating Values (Accumulator, Element): (1, 3)
Acc Updated : 3
Combining Values : (1, 1)
Accumulating Values (Accumulator, Element): (1, 8)
Accumulating Values (Accumulator, Element): (1, 6)
Accumulating Values (Accumulator, Element): (1, 2)
Acc Updated : 2
Acc Updated : 8
Accumulating Values (Accumulator, Element): (1, 5)
Accumulating Values (Accumulator, Element): (1, 1)
Acc Updated : 5
Acc Updated : 6
Combining Values : (1, 1)
Accumulating Values (Accumulator, Element): (1, 7)
Acc Updated : 7
Combining Values : (1, 1)
Combining Values : (1, 1)
Acc Updated : 1
Combining Values : (1, 1)
Combining Values : (1, 1)
Combining Values : (1, 1)
Vocation:
List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
Collector<Integer, Long, Long> parallelMultiplier = new ParallelMultiplier();
result = intList.parallelStream().collect(parallelMultiplier);
i.e. the result of the multiplication is 1, where it must be 8 factorial. I don't use "simultaneous" characteristics either. Ideally, I should have gotten the result of the multiplication of the substreams fed into the combiner () operation, but that doesn't seem to be happening here.
Keeping aside inefficient boxing / unboxing implementations, any clue where I could go wrong?
source to share
Your collector is a little off. Here's a simplified version (why you're not working - see the end):
static class ParallelMultiplier implements Collector<Integer, long[], Long> {
@Override
public BiConsumer<long[], Integer> accumulator() {
return (left, right) -> left[0] *= right;
}
@Override
public BinaryOperator<long[]> combiner() {
return (left, right) -> {
left[0] = left[0] * right[0];
return left;
};
}
@Override
public Function<long[], Long> finisher() {
return arr -> arr[0];
}
@Override
public Supplier<long[]> supplier() {
return () -> new long[] { 1L };
}
@Override
public Set<java.util.stream.Collector.Characteristics> characteristics() {
return Collections.unmodifiableSet(EnumSet.noneOf(Characteristics.class));
}
}
You can illustrate the following problems as follows:
static Long test(Long left, Long right) {
left = left * right;
return left;
}
long l = 12L;
long r = 13L;
test(l, r);
System.out.println(l); // still 12
source to share
As pointed out by Flown, Java primitive types are immutable and cannot be used as an accumulator. Since you are calculating multiplication in parallel, we want to use a thread-safe implementation of a mutable Long
, which is AtomicLong
.
import java.util.*;
import java.util.concurrent.atomic.*;
import java.util.function.*;
import java.util.stream.*;
public class ParallelMultiplier implements Collector<Integer, AtomicLong, Long> {
@Override
public BiConsumer<AtomicLong, Integer> accumulator() {
return (operand1, operand2) -> operand1.set(operand1.longValue() * operand2.longValue());
}
@Override
public Set<java.util.stream.Collector.Characteristics> characteristics() {
return Collections.unmodifiableSet(EnumSet.of(Characteristics.UNORDERED));
}
@Override
public BinaryOperator<AtomicLong> combiner() {
return (operand1, operand2) -> new AtomicLong(operand1.longValue() * operand2.longValue());
}
@Override
public Function<AtomicLong, Long> finisher() {
return l -> l.longValue();
}
@Override
public Supplier<AtomicLong> supplier() {
return () -> new AtomicLong(1);
}
}
Test this by using what you provide leads to the correct answer 8! = 40320
.
source to share