Spark RDD.aggregate vs RDD.reduceByKey?

I also have an RDD [String] containing one word per line. The size is currently very small, 10-20K lines, but the goal is to scale it to hundreds of millions of lines. The problem I am having is that doing the map / reduceByKey operation makes it surprisingly long even for this small dataset. I am running the following:

val wordcount = filtered.map(w => (w,1)).reduceByKey(_ + _)

      

and for 16780 lines - 12321ms on a 2GHz i7 8GB RAM. I found that there is a method called aggregate that can be more memory efficient and therefore faster:

aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

      

I cannot figure out how to implement this in my case. I am guessing it should be something like

aggregate(collection.immutable.Map)(??)

      

So my questions

1) Does it make sense to use aggregate instead of reduceByKey

2) If so, how will this be implemented?

+3


source to share


2 answers


I guess the fastest would be countByValue

:

Returns the score of each unique value in this RDD as a map of pairs (numbers, numbers). The final merging step occurs locally on the host computer, which is equivalent to running one pruning task.

Usage is trivial:

val wordcount = filtered.countByValue

      



The implementation of this method should answer the second question :)

By the way, reduceByKey

it shouldn't last that long. It looks like the pre-computation (i.e. Filtering) takes up most of those 12 seconds. To check this, persist

RDD before counting:

val persisted = filtered.persist
val wordcount = persisted.countByValue

      

+5


source


countByValue

would be the fastest way to do this, however its implementation uses hash maps and concatenates them, so if you have a large amount of data, this approach may not scale very well (especially when you consider how many releases with memory). You could use the standard way of counting in decreasing the map, which was to display line and 1 as pairs, and then reduceBykey

like this:

val wordCount = filtered.map((_,1)).reduceByKey(_+_).collect()

      



You might also consider using countByValueApprox

it generally when dealing with data, this large approximation will generally be a good enough and by far the most efficient approach (although it still uses hash maps, so with many unique words you you can still fail). You might consider using this if you are unable to start countByValue

.

0


source







All Articles