Avoid shuffling with ReduceByKey in Spark

I am taking coursera on Scala Spark and I am trying to optimize this snippet:

val indexedMeansG = vectors.                                                                                                                                                                 
       map(v => findClosest(v, means) -> v).                                                                                                                                                      
       groupByKey.mapValues(averageVectors)

      

vectors

is RDD[(Int, Int)]

to see a list of dependencies and a line of the RDD used:

println(s"""GroupBy:                                                                                                                                                                         
             | Deps: ${indexedMeansG.dependencies.size}                                                                                                                                           
             | Deps: ${indexedMeansG.dependencies}                                                                                                                                                
             | Lineage: ${indexedMeansG.toDebugString}""".stripMargin)

      

Which shows this:

/* GroupBy:                                                                                                                                                                                  
   * Deps: 1                                                                                                                                                                                      
   * Deps: List(org.apache.spark.OneToOneDependency@44d1924)                                                                                                                                      
   * Lineage: (6) MapPartitionsRDD[18] at mapValues at StackOverflow.scala:207 []                                                                                                                 
   *  ShuffledRDD[17] at groupByKey at StackOverflow.scala:207 []                                                                                                                                 
   * +-(6) MapPartitionsRDD[16] at map at StackOverflow.scala:206 []                                                                                                                              
   *  MapPartitionsRDD[13] at map at StackOverflow.scala:139 []                                                                                                                                   
   *      CachedPartitions: 6; MemorySize: 84.0 MB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B                                                                                                
   *  MapPartitionsRDD[12] at values at StackOverflow.scala:116 []                                                                                                                                
   *  MapPartitionsRDD[11] at mapValues at StackOverflow.scala:115 []                                                                                                                             
   *  MapPartitionsRDD[10] at groupByKey at StackOverflow.scala:92 []                                                                                                                             
   *  MapPartitionsRDD[9] at join at StackOverflow.scala:91 []                                                                                                                                    
   *  MapPartitionsRDD[8] at join at StackOverflow.scala:91 []                                                                                                                                    
   *  CoGroupedRDD[7] at join at StackOverflow.scala:91 []                                                                                                                                        
   *    +-(6) MapPartitionsRDD[4] at map at StackOverflow.scala:88 []                                                                                                                             
   *  |  MapPartitionsRDD[3] at filter at StackOverflow.scala:88 []                                                                                                                               
   *  |  MapPartitionsRDD[2] at map at StackOverflow.scala:69 []                                                                                                                                  
   *  |  src/main/resources/stackoverflow/stackoverflow.csv MapPartitionsRDD[1] at textFile at StackOverflow.scala:23 []                                                                          
   *  |  src/main/resources/stackoverflow/stackoverflow.csv HadoopRDD[0] at textFile at StackOverflow.scala:23 []                                                                                 
   *    +-(6) MapPartitionsRDD[6] at map at StackOverflow.scala:89 []                                                                                                                             
   *  MapPartitionsRDD[5] at filter at StackOverflow.scala:89 []                                                                                                                                  
   *  MapPartitionsRDD[2] at map at StackOverflow.scala:69 []                                                                                                                                     
   *  src/main/resources/stackoverflow/stackoverflow.csv MapPartitionsRDD[1] at textFile at StackOverflow.scala:23 []                                                                             
   *  src/main/resources/stackoverflow/stackoverflow.csv HadoopRDD[0] at textFile at StackOverflow.scala:23 [] */

      

From this List(org.apache.spark.OneToOneDependency@44d1924)

I understood that no shuffling is being done, right? However, below ShuffledRDD[17]

is printed, which means that there is actually a shuffle going on.

I tried to replace this call groupByKey

with reduceByKey

, for example:

val indexedMeansR = vectors.                                                                                                                                                              
      map(v => findClosest(v, means) -> v).                                                                                                                                                   
      reduceByKey((a, b) => (a._1 + b._1) / 2 -> (a._2 + b._2) / 2)

      

And its dependencies and pedigree:

/* ReduceBy:                                                                                                                                                                                 
   * Deps: 1                                                                                                                                                                                      
   * Deps: List(org.apache.spark.ShuffleDependency@4d5e813f)                                                                                                                                      
   * Lineage: (6) ShuffledRDD[17] at reduceByKey at StackOverflow.scala:211 []                                                                                                                    
   * +-(6) MapPartitionsRDD[16] at map at StackOverflow.scala:210 []                                                                                                                              
   *  MapPartitionsRDD[13] at map at StackOverflow.scala:139 []                                                                                                                                   
   *      CachedPartitions: 6; MemorySize: 84.0 MB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B                                                                                                
   *  MapPartitionsRDD[12] at values at StackOverflow.scala:116 []                                                                                                                                
   *  MapPartitionsRDD[11] at mapValues at StackOverflow.scala:115 []                                                                                                                             
   *  MapPartitionsRDD[10] at groupByKey at StackOverflow.scala:92 []                                                                                                                             
   *  MapPartitionsRDD[9] at join at StackOverflow.scala:91 []                                                                                                                                    
   *  MapPartitionsRDD[8] at join at StackOverflow.scala:91 []                                                                                                                                    
   *  CoGroupedRDD[7] at join at StackOverflow.scala:91 []                                                                                                                                        
   *    +-(6) MapPartitionsRDD[4] at map at StackOverflow.scala:88 []                                                                                                                             
   *  |  MapPartitionsRDD[3] at filter at StackOverflow.scala:88 []                                                                                                                               
   *  |  MapPartitionsRDD[2] at map at StackOverflow.scala:69 []                                                                                                                                  
   *  |  src/main/resources/stackoverflow/stackoverflow.csv MapPartitionsRDD[1] at textFile at StackOverflow.scala:23 []                                                                          
   *  |  src/main/resources/stackoverflow/stackoverflow.csv HadoopRDD[0] at textFile at StackOverflow.scala:23 []                                                                                 
   *    +-(6) MapPartitionsRDD[6] at map at StackOverflow.scala:89 []                                                                                                                             
   *  MapPartitionsRDD[5] at filter at StackOverflow.scala:89 []                                                                                                                                  
   *  MapPartitionsRDD[2] at map at StackOverflow.scala:69 []                                                                                                                                     
   *  src/main/resources/stackoverflow/stackoverflow.csv MapPartitionsRDD[1] at textFile at StackOverflow.scala:23 []                                                                             
   *  src/main/resources/stackoverflow/stackoverflow.csv HadoopRDD[0] at textFile at StackOverflow.scala:23 [] */

      

Addictive this time ShuffleDependency

and I can't figure out why.

Since RDD is a pair where the keys are Ints , and therefore have ordering, I also tried to modify the delimiter and use RangePartitioner

, but it does not improve

+3


source to share


1 answer


Operation

A reduceByKey

still includes shuffling as it still requires all items with the same key to become part of the same section.



However, this will be much less than an operation groupByKey

. A reduceByKey

will perform a restore operation on each partition before moving, thereby reducing the amount of data that needs to be shuffled.

+3


source







All Articles