How does aggregate work in scala?

I knew how a regular aggregate works in scala and its use on folds. Tried a lot to know how below code works but couldn't. Could anyone help me explain how this works and gives me the result (10,4)

val input=List(1,2,3,4)
val result = input.aggregate((0, 0))(
(acc, value) => (acc._1 + value, acc._2 + 1),
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))

      

+3


source to share


1 answer


Can someone help me explain how this works and gives me the result of (10,4)

When using an aggregate, you provide three parameters:

  • the initial value from which you accumulate items from the section, often a neutral item
  • the function defining the section will accumulate the result inside it

  • a function that will combine two sections

So, in your case, the starting value for the section is the tuple (0, 0).

Then the accumulator function you define sums up the current element that you are viewing by the first element of the tuple and increment the second element of the tuple by one. In fact, it will calculate the sum of the items in the section and its cardinality.

The combined function has merged two tuples. As you defined it, it sums the sums and counts the number of items from 2 sections. It is not used in your case because you are traversing the pipeline sequentially. You can call .par

on the list to get a parallel implementation to see the combiner in action (note that it must be an associative function).

So you get (10, 4) because 1 + 2 + 3 + 4 = 10, and there were 4 items in the list (you made 4 additions).

You can add a print statement to an accumulator function (running on serial input) to see how it behaves:



Acc: (0,0) - value:1
Acc: (1,1) - value:2
Acc: (3,2) - value:3
Acc: (6,3) - value:4

      

I knew how a regular aggregate works in scala and its use on folds.

For serial input aggregate

is foldLeft

:

def aggregate[B](z: =>B)(seqop: (B, A) => B, combop: (B, B) => B): B = foldLeft(z)(seqop)

      

For parallel input, the list is split into chunks so that multiple threads can work separately. The battery function runs on each block using an initial value. When two streams need to combine their results, use the combine function:

def aggregate[S](z: =>S)(seqop: (S, T) => S, combop: (S, S) => S): S = {
  tasksupport.executeAndWaitResult(new Aggregate(() => z, seqop, combop, splitter))
}

      

This is the principle of the fork-join model, but it requires your task to be well parallelizable. This is because a thread doesn't need to know the result of another thread to do its job.

+3


source







All Articles