Avoid shuffling with ReduceByKey in Spark
I am taking coursera on Scala Spark and I am trying to optimize this snippet:
val indexedMeansG = vectors.
map(v => findClosest(v, means) -> v).
groupByKey.mapValues(averageVectors)
vectors
is RDD[(Int, Int)]
to see a list of dependencies and a line of the RDD used:
println(s"""GroupBy:
| Deps: ${indexedMeansG.dependencies.size}
| Deps: ${indexedMeansG.dependencies}
| Lineage: ${indexedMeansG.toDebugString}""".stripMargin)
Which shows this:
/* GroupBy:
* Deps: 1
* Deps: List(org.apache.spark.OneToOneDependency@44d1924)
* Lineage: (6) MapPartitionsRDD[18] at mapValues at StackOverflow.scala:207 []
* ShuffledRDD[17] at groupByKey at StackOverflow.scala:207 []
* +-(6) MapPartitionsRDD[16] at map at StackOverflow.scala:206 []
* MapPartitionsRDD[13] at map at StackOverflow.scala:139 []
* CachedPartitions: 6; MemorySize: 84.0 MB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
* MapPartitionsRDD[12] at values at StackOverflow.scala:116 []
* MapPartitionsRDD[11] at mapValues at StackOverflow.scala:115 []
* MapPartitionsRDD[10] at groupByKey at StackOverflow.scala:92 []
* MapPartitionsRDD[9] at join at StackOverflow.scala:91 []
* MapPartitionsRDD[8] at join at StackOverflow.scala:91 []
* CoGroupedRDD[7] at join at StackOverflow.scala:91 []
* +-(6) MapPartitionsRDD[4] at map at StackOverflow.scala:88 []
* | MapPartitionsRDD[3] at filter at StackOverflow.scala:88 []
* | MapPartitionsRDD[2] at map at StackOverflow.scala:69 []
* | src/main/resources/stackoverflow/stackoverflow.csv MapPartitionsRDD[1] at textFile at StackOverflow.scala:23 []
* | src/main/resources/stackoverflow/stackoverflow.csv HadoopRDD[0] at textFile at StackOverflow.scala:23 []
* +-(6) MapPartitionsRDD[6] at map at StackOverflow.scala:89 []
* MapPartitionsRDD[5] at filter at StackOverflow.scala:89 []
* MapPartitionsRDD[2] at map at StackOverflow.scala:69 []
* src/main/resources/stackoverflow/stackoverflow.csv MapPartitionsRDD[1] at textFile at StackOverflow.scala:23 []
* src/main/resources/stackoverflow/stackoverflow.csv HadoopRDD[0] at textFile at StackOverflow.scala:23 [] */
From this List(org.apache.spark.OneToOneDependency@44d1924)
I understood that no shuffling is being done, right? However, below ShuffledRDD[17]
is printed, which means that there is actually a shuffle going on.
I tried to replace this call groupByKey
with reduceByKey
, for example:
val indexedMeansR = vectors.
map(v => findClosest(v, means) -> v).
reduceByKey((a, b) => (a._1 + b._1) / 2 -> (a._2 + b._2) / 2)
And its dependencies and pedigree:
/* ReduceBy:
* Deps: 1
* Deps: List(org.apache.spark.ShuffleDependency@4d5e813f)
* Lineage: (6) ShuffledRDD[17] at reduceByKey at StackOverflow.scala:211 []
* +-(6) MapPartitionsRDD[16] at map at StackOverflow.scala:210 []
* MapPartitionsRDD[13] at map at StackOverflow.scala:139 []
* CachedPartitions: 6; MemorySize: 84.0 MB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
* MapPartitionsRDD[12] at values at StackOverflow.scala:116 []
* MapPartitionsRDD[11] at mapValues at StackOverflow.scala:115 []
* MapPartitionsRDD[10] at groupByKey at StackOverflow.scala:92 []
* MapPartitionsRDD[9] at join at StackOverflow.scala:91 []
* MapPartitionsRDD[8] at join at StackOverflow.scala:91 []
* CoGroupedRDD[7] at join at StackOverflow.scala:91 []
* +-(6) MapPartitionsRDD[4] at map at StackOverflow.scala:88 []
* | MapPartitionsRDD[3] at filter at StackOverflow.scala:88 []
* | MapPartitionsRDD[2] at map at StackOverflow.scala:69 []
* | src/main/resources/stackoverflow/stackoverflow.csv MapPartitionsRDD[1] at textFile at StackOverflow.scala:23 []
* | src/main/resources/stackoverflow/stackoverflow.csv HadoopRDD[0] at textFile at StackOverflow.scala:23 []
* +-(6) MapPartitionsRDD[6] at map at StackOverflow.scala:89 []
* MapPartitionsRDD[5] at filter at StackOverflow.scala:89 []
* MapPartitionsRDD[2] at map at StackOverflow.scala:69 []
* src/main/resources/stackoverflow/stackoverflow.csv MapPartitionsRDD[1] at textFile at StackOverflow.scala:23 []
* src/main/resources/stackoverflow/stackoverflow.csv HadoopRDD[0] at textFile at StackOverflow.scala:23 [] */
Addictive this time ShuffleDependency
and I can't figure out why.
Since RDD is a pair where the keys are Ints , and therefore have ordering, I also tried to modify the delimiter and use RangePartitioner
, but it does not improve
source to share
A reduceByKey
still includes shuffling as it still requires all items with the same key to become part of the same section.
However, this will be much less than an operation groupByKey
. A reduceByKey
will perform a restore operation on each partition before moving, thereby reducing the amount of data that needs to be shuffled.
source to share