Spark RDD.isEmpty costs a lot of time

I have built Spark cluster

.
workers: 2
Core: 12
Memory: 32.0 GB Total, 20.0 GB Used
Each worker gets 1 processor, 6 cores and 10.0 GB of memory

My program gets data source from MongoDB cluster

. Spark

and MongoDB cluster

are in the same LAN

(1000 Mbps). MongoDB document

format: {name:string, value:double, time:ISODate}

There are about 13 million documents.

I want to get the average of a special name

from a specific hour that contains 60 documents. Here is my key function

 /*
  *rdd=sc.newAPIHadoopRDD(configOriginal, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])
   Apache-Spark-1.3.1 scala doc: SparkContext.newAPIHadoopFile[K, V, F <: InputFormat[K, V]](path: String, fClass: Class[F], kClass: Class[K], vClass: Class[V], conf: Configuration = hadoopConfiguration): RDD[(K, V)]
  */
def findValueByNameAndRange(rdd:RDD[(Object,BSONObject)],name:String,time:Date): RDD[BasicBSONObject]={

val nameRdd = rdd.map(arg=>arg._2).filter(_.get("name").equals(name))
val timeRangeRdd1 = nameRdd.map(tuple=>(tuple, tuple.get("time").asInstanceOf[Date]))
val timeRangeRdd2 = timeRangeRdd1.map(tuple=>(tuple._1,duringTime(tuple._2,time,getHourAgo(time,1))))
val timeRangeRdd3 = timeRangeRdd2.filter(_._2).map(_._1)
val timeRangeRdd4 = timeRangeRdd3.map(x => (x.get("name").toString, x.get("value").toString.toDouble)).reduceByKey(_ + _)

if(timeRangeRdd4.isEmpty()){
  return basicBSONRDD(name, time)
}
else{
 return timeRangeRdd4.map(tuple => {
  val bson = new BasicBSONObject()
  bson.put("name", tuple._1)
  bson.put("value", tuple._2/60)
  bson.put("time", time)
   bson })
  }
} 

      

Here is some information Job

enter image description here enter image description here

My program is so slow. Because of isEmpty

and reduceByKey

? If so, how can I improve it? If not, why not?
======= update ===

 timeRangeRdd3.map(x => (x.get("name").toString, x.get("value").toString.toDouble)).reduceByKey(_ + _)

      

is on line 34

enter image description here

I know reduceByKey is a global operation and can be time-consuming, however the cost is beyond my budget. How can I improve it or is it a spark defect. With the same computation and hardware, it just costs a few seconds if I use multiple java threads.

+3


source to share


1 answer


First, isEmpty

it is simply the point at which the RDD phase ends. map

and filter

do not create the need for shuffling, and the method used in the UI is always the method that triggers the scene change / move ... in this case isEmpty

. Why it works slower is not easy to discern this perspective, especially without seeing the composition of the original RDD

. I can tell you that it isEmpty

checks the size first partition

and then executes take(1)

and checks if the data was returned or not. So the chances are there is a bottleneck or something else blocking the net along the way. It might even be a GC issue ... Click on isEmpty

and see what else you can distinguish from there.



+4


source







All Articles