Spark RDD.isEmpty costs a lot of time
I have built Spark cluster
.
workers: 2
Core: 12
Memory: 32.0 GB Total, 20.0 GB Used
Each worker gets 1 processor, 6 cores and 10.0 GB of memory
My program gets data source from MongoDB cluster
. Spark
and MongoDB cluster
are in the same LAN
(1000 Mbps).
MongoDB document
format:
{name:string, value:double, time:ISODate}
There are about 13 million documents.
I want to get the average of a special name
from a specific hour that contains 60 documents. Here is my key function
/*
*rdd=sc.newAPIHadoopRDD(configOriginal, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])
Apache-Spark-1.3.1 scala doc: SparkContext.newAPIHadoopFile[K, V, F <: InputFormat[K, V]](path: String, fClass: Class[F], kClass: Class[K], vClass: Class[V], conf: Configuration = hadoopConfiguration): RDD[(K, V)]
*/
def findValueByNameAndRange(rdd:RDD[(Object,BSONObject)],name:String,time:Date): RDD[BasicBSONObject]={
val nameRdd = rdd.map(arg=>arg._2).filter(_.get("name").equals(name))
val timeRangeRdd1 = nameRdd.map(tuple=>(tuple, tuple.get("time").asInstanceOf[Date]))
val timeRangeRdd2 = timeRangeRdd1.map(tuple=>(tuple._1,duringTime(tuple._2,time,getHourAgo(time,1))))
val timeRangeRdd3 = timeRangeRdd2.filter(_._2).map(_._1)
val timeRangeRdd4 = timeRangeRdd3.map(x => (x.get("name").toString, x.get("value").toString.toDouble)).reduceByKey(_ + _)
if(timeRangeRdd4.isEmpty()){
return basicBSONRDD(name, time)
}
else{
return timeRangeRdd4.map(tuple => {
val bson = new BasicBSONObject()
bson.put("name", tuple._1)
bson.put("value", tuple._2/60)
bson.put("time", time)
bson })
}
}
My program is so slow. Because of isEmpty
and reduceByKey
? If so, how can I improve it? If not, why not?
======= update ===
timeRangeRdd3.map(x => (x.get("name").toString, x.get("value").toString.toDouble)).reduceByKey(_ + _)
is on line 34
I know reduceByKey is a global operation and can be time-consuming, however the cost is beyond my budget. How can I improve it or is it a spark defect. With the same computation and hardware, it just costs a few seconds if I use multiple java threads.
source to share
First, isEmpty
it is simply the point at which the RDD phase ends. map
and filter
do not create the need for shuffling, and the method used in the UI is always the method that triggers the scene change / move ... in this case isEmpty
. Why it works slower is not easy to discern this perspective, especially without seeing the composition of the original RDD
. I can tell you that it isEmpty
checks the size first partition
and then executes take(1)
and checks if the data was returned or not. So the chances are there is a bottleneck or something else blocking the net along the way. It might even be a GC issue ... Click on isEmpty
and see what else you can distinguish from there.
source to share