Java Collector.combiner gets called with provider values โ€‹โ€‹always

Problem: Create a Collector implementation that will multiply a stream of integers in parallel and return Long.

Implementation:

public class ParallelMultiplier implements Collector<Integer, Long, Long> {

    @Override
    public BiConsumer<Long, Integer> accumulator() {
        // TODO Auto-generated method stub
        return (operand1, operand2) -> {
            System.out.println("Accumulating Values (Accumulator, Element): (" + operand1 + ", " + operand2 + ")");
                                        long Lval = operand1.longValue(); 
                                        int Ival = operand2.intValue();
                                        Lval *= Ival;
                                        operand1 = Long.valueOf(Lval);
                                        System.out.println("Acc Updated : " + operand1);
                                        };
    }

    @Override
    public Set<java.util.stream.Collector.Characteristics> characteristics() {
        // TODO Auto-generated method stub
        return Collections.unmodifiableSet(EnumSet.of(UNORDERED));
    }

    @Override
    public BinaryOperator<Long> combiner() {        
        return (operand1, operand2) -> {
            System.out.println("Combining Values : (" + operand1 + ", " + operand2 + ")");
                                        long Lval1 = operand1.longValue(); 
                                        long Lval2 = operand2.longValue();
                                        Lval1 *= Lval2; 
                                        return Long.valueOf(Lval1);
                                        };
    }

    @Override
    public Function<Long, Long> finisher() {
        // TODO Auto-generated method stub
        return Function.identity();
    }

    @Override
    public Supplier<Long> supplier() {      
        return () -> new Long(1L);
    }

}

      

Observed output:

Accumulating Values (Accumulator, Element): (1, 4)
Acc Updated : 4
Accumulating Values (Accumulator, Element): (1, 3)
Acc Updated : 3
Combining Values : (1, 1)
Accumulating Values (Accumulator, Element): (1, 8)
Accumulating Values (Accumulator, Element): (1, 6)
Accumulating Values (Accumulator, Element): (1, 2)
Acc Updated : 2
Acc Updated : 8
Accumulating Values (Accumulator, Element): (1, 5)
Accumulating Values (Accumulator, Element): (1, 1)
Acc Updated : 5
Acc Updated : 6
Combining Values : (1, 1)
Accumulating Values (Accumulator, Element): (1, 7)
Acc Updated : 7
Combining Values : (1, 1)
Combining Values : (1, 1)
Acc Updated : 1
Combining Values : (1, 1)
Combining Values : (1, 1)
Combining Values : (1, 1)

      

Vocation:

List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
        Collector<Integer, Long, Long> parallelMultiplier = new ParallelMultiplier();

        result = intList.parallelStream().collect(parallelMultiplier);

      

i.e. the result of the multiplication is 1, where it must be 8 factorial. I don't use "simultaneous" characteristics either. Ideally, I should have gotten the result of the multiplication of the substreams fed into the combiner () operation, but that doesn't seem to be happening here.

Keeping aside inefficient boxing / unboxing implementations, any clue where I could go wrong?

+3


source to share


2 answers


Your collector is a little off. Here's a simplified version (why you're not working - see the end):

 static class ParallelMultiplier implements Collector<Integer, long[], Long> {

    @Override
    public BiConsumer<long[], Integer> accumulator() {

        return (left, right) -> left[0] *= right;
    }

    @Override
    public BinaryOperator<long[]> combiner() {
        return (left, right) -> {
            left[0] = left[0] * right[0];
            return left;
        };
    }

    @Override
    public Function<long[], Long> finisher() {
        return arr -> arr[0];
    }

    @Override
    public Supplier<long[]> supplier() {
        return () -> new long[] { 1L };
    }

    @Override
    public Set<java.util.stream.Collector.Characteristics> characteristics() {
        return Collections.unmodifiableSet(EnumSet.noneOf(Characteristics.class));
    }
}

      



You can illustrate the following problems as follows:

static Long test(Long left, Long right) {
    left = left * right;
    return left;
}

long l = 12L;
long r = 13L;

test(l, r);
System.out.println(l); // still 12

      

+4


source


As pointed out by Flown, Java primitive types are immutable and cannot be used as an accumulator. Since you are calculating multiplication in parallel, we want to use a thread-safe implementation of a mutable Long

, which is AtomicLong

.

import java.util.*;
import java.util.concurrent.atomic.*;
import java.util.function.*;
import java.util.stream.*;

public class ParallelMultiplier implements Collector<Integer, AtomicLong, Long> {

    @Override
    public BiConsumer<AtomicLong, Integer> accumulator() {
        return (operand1, operand2) -> operand1.set(operand1.longValue() * operand2.longValue());
    }

    @Override
    public Set<java.util.stream.Collector.Characteristics> characteristics() {
        return Collections.unmodifiableSet(EnumSet.of(Characteristics.UNORDERED));
    }

    @Override
    public BinaryOperator<AtomicLong> combiner() {        
        return (operand1, operand2) -> new AtomicLong(operand1.longValue() * operand2.longValue());
    }

    @Override
    public Function<AtomicLong, Long> finisher() {
        return l -> l.longValue();
    }

    @Override
    public Supplier<AtomicLong> supplier() {      
        return () -> new AtomicLong(1);
    }

}

      



Test this by using what you provide leads to the correct answer 8! = 40320

.

0


source







All Articles