What are the requirements of Stream Stream ()?
When using an operation reduce()
in a parallel thread, the OCP exam book says that there are certain principles that arguments must adhere to reduce()
. These arguments are as follows:
- An identity must be defined such that for all elements in the stream u, the union adder (identity, u) is equal to u.
- The accumulator operator op must be associative and unparalleled, so
(a op b) op c
equala op (b op c)
. - Combiner operator must also be associative and bezstoyaschim and compatible with the ID, so for all
u
andt
combiner.apply(u, accumulator.apply(identity, t))
willaccumulator.apply(u,t)
.
The exam book gives two examples to illustrate these principles, see the following code:
example for associative:
System.out.println(Arrays,asList(1,2,3,4,5,6))
.parallelStream()
.reduce(0,(a,b) -> (a-b))); //NOT AN ASSOCIATIVE ACCUMULATOR
What the OCP book says about it:
It may output -21, 3, or some other value as an accumulator function breaks the associativity property.
example for identification requirement:
System.out.println(Arrays.asList("w","o","l","f"))
.parallelStream()
.reduce("X", String::concat));
What the OCP book says about it:
You can see other problems if we use an identity parameter that is not really an identity value. It can output XwXoXlXf. In a parallel process, an identity is applied to multiple elements in a stream, resulting in very unexpected data.
I don't understand these examples. With the battery example, the accumulator starts at 0 -1 = -1, then -1 -2 which is -3, and then -6, etc. Etc. Up to -21. I understand that since the arraylist created is not in sync, the results can be unpredictable due to the possibility of race conditions, etc., but why is there no associative accumulator? Could you also cause unpredictable results? I really don't understand what happened to the accumulator being used in this example and why it is not associative, but then again I still don't quite understand what is going on with the associative principle.
I don't understand the example of identity either. I understand that the result could be XwXoXlXf if 4 separate threads started piling up with identity at the same time, but what does this have to do with the identity parameter itself? What exactly would be the correct identity to use?
I was wondering if someone could enlighten me a little on these principles.
thank
source to share
why is the battery associator not?
This is not associative, as the order of subtraction determines the final result.
If you run serial Stream
, you get the expected output:
0 - 1 - 2 - 3 - 4 - 5 - 6 = -21
On the other hand, for parallel Stream
s, work is split into multiple threads. For example, if reduce
executed in parallel on 6 threads, and then the intermediate results are combined, you might get a different result:
0 - 1 0 - 2 0 - 3 0 - 4 0 - 5 0 - 6
-1 -2 -3 -4 -5 -6
-1 - (-2) -3 - (-4) -5 - (-6)
1 1 1
1 - 1
0 - 1
-1
Or, to make a long example short:
(1 - 2) - 3 = -4
1 - (2 - 3) = 2
Therefore, subtraction is not associative.
On the other hand, it a+b
does not cause the same problem since addition is an associative operator (i.e. (a+b)+c == a+(b+c)
).
The problem with the identity example is that when pruning is done in parallel across multiple threads, an "X" is added to the start of each intermediate result.
What exactly would be the correct identity to use?
If you change the id value to ""
:
System.out.println(Arrays.asList("w","o","l","f"))
.parallelStream()
.reduce("", String::concat));
you get "wolf" instead of "XwXoXlXf".
source to share
Let me give you two examples. First, where the identity is broken:
int result = Stream.of(1, 2, 3, 4, 5, 6)
.parallel()
.reduce(10, (a, b) -> a + b);
System.out.println(result); // 81 on my run
Basically, you have violated this rule: The identity value must be an identity for the accumulator function. This means that for all u, accumulator(identity, u) is equal to u
.
Or to make it easier, let's see if this rule is true for some random data from our stream:
Integer identity = 10;
BinaryOperator<Integer> combiner = (x, y) -> x + y;
boolean identityRespected = combiner.apply(identity, 1) == 1;
System.out.println(identityRespected); // prints false
And a second example:
/**
* count letters, adding a bit more all the time
*/
private static int howMany(List<String> tokens) {
return tokens.stream()
.parallel()
.reduce(0, // identity
(i, s) -> { // accumulator
return s.length() + i;
}, (left, right) -> { // combiner
return left + right + left; // notice the extra left here
});
}
And you call it with:
List<String> left = Arrays.asList("aa", "bbb", "cccc", "ddddd", "eeeeee");
List<String> right = Arrays.asList("aa", "bbb", "cccc", "ddddd", "eeeeee", "");
System.out.println(howMany(left)); // 38 on my run
System.out.println(howMany(right)); // 50 on my run
Basically you broke this rule: Additionally, the combiner function must be compatible with the accumulator function
or in code:
// this must hold!
// combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)
Integer identity = 0;
String t = "aa";
Integer u = 3; // "bbb"
BiFunction<Integer, String, Integer> accumulator = (Integer i, String s) -> i + s.length();
BinaryOperator<Integer> combiner = (left, right) -> left + right + left;
int first = accumulator.apply(identity, t); // 2
int second = combiner.apply(u, first); // 3 + 2 + 3 = 8
Integer shouldBe8 = accumulator.apply(u, t);
System.out.println(shouldBe8 == second); // false
source to share