Apache Spark: unexpected filtering results
I am working with Apache Spark v 1.2 in local mode. I created an RDD and stored it in memory. Spark Web shows that 85% of this RDD is held in memory. I have a function / variable in the RDD that has a value of 0.1, as demonstrated by the result I got by running the script below:
In[96]: flagged.map(lambda x:(x[14],1)).reduceByKey(lambda x,y:x+y).collect()
Out[96]: [(0, 637981), (1, 272958)]
Also when I do flagged.count () the number is the sum of the two values i.e. 637981 + 272958 = 910939
Now, when I run a filter based on this, I don't get the same counts:
In[97]: flagged.filter(lambda x: x[14]==0).count()
Out[97]: 637344
In[97]: flagged.filter(lambda x: x[14]==1).count()
Out[97]: 272988
I am trying to understand why the numbers obtained from filtered RDDs do not match the values of the reduceByKey method.
source to share
Use cache type MEMORY_AND_DISK
rdd.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK)
For a moment I sensed that this was a mistake, I completed the sample assignment and it looks like you are right.
val count3 = sc.parallelize(1 to 1000000).map(r => {
(new java.util.Random().nextInt(2), 1)
})
count3.reduceByKey(_+_).collect
res10: Array [(Int, Int)] = array ((0.500201), (1.499799))
count3.filter(r => r._1==0).count
res13: Long = 499613
count3.filter(r => r._1==1).count
res14: Long = 500143
But then I changed my code to
val count3 = sc.parallelize(1 to 1000000).map(r => {
(new java.util.Random().nextInt(2), 1)
}).persist()
count3.count
Please note, I added that this time I stayed (and I was able to cache 100% of that rdd)
count3.reduceByKey(_+_).collect
res27: Array [(Int, Int)] = Array ((0.500048), (1.499952))
count3.filter(r => r._1==0).count
res28: Long = 500048
count3.filter(r => r._1==1).count
res29: Long = 499952
I think you generate an RDD and then store it, the default cache type is MEMORY_ONLY
. Now the problem is that you can only cache 85% of rdd in memory, which means that the remaining 15% will be recalculated on demand. If you use any random function when generating rdd, that 15% of the data may change during recalculation.
source to share