Apache Spark: unexpected filtering results

I am working with Apache Spark v 1.2 in local mode. I created an RDD and stored it in memory. Spark Web shows that 85% of this RDD is held in memory. I have a function / variable in the RDD that has a value of 0.1, as demonstrated by the result I got by running the script below:

In[96]: flagged.map(lambda x:(x[14],1)).reduceByKey(lambda x,y:x+y).collect()

Out[96]: [(0, 637981), (1, 272958)]

      

Also when I do flagged.count () the number is the sum of the two values ​​i.e. 637981 + 272958 = 910939

Now, when I run a filter based on this, I don't get the same counts:

In[97]:  flagged.filter(lambda x: x[14]==0).count()

Out[97]:  637344

In[97]:  flagged.filter(lambda x: x[14]==1).count()

Out[97]:  272988

      

I am trying to understand why the numbers obtained from filtered RDDs do not match the values ​​of the reduceByKey method.

+3


source to share


1 answer


Use cache type MEMORY_AND_DISK

rdd.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK)

      

For a moment I sensed that this was a mistake, I completed the sample assignment and it looks like you are right.

  val count3 = sc.parallelize(1 to 1000000).map(r => {
    (new java.util.Random().nextInt(2), 1)
  })


 count3.reduceByKey(_+_).collect

      

res10: Array [(Int, Int)] = array ((0.500201), (1.499799))

 count3.filter(r => r._1==0).count

      

res13: Long = 499613

 count3.filter(r => r._1==1).count

      

res14: Long = 500143

But then I changed my code to



 val count3 = sc.parallelize(1 to 1000000).map(r => {
    (new java.util.Random().nextInt(2), 1)
  }).persist()
count3.count  

      

Please note, I added that this time I stayed (and I was able to cache 100% of that rdd)

count3.reduceByKey(_+_).collect

      

res27: Array [(Int, Int)] = Array ((0.500048), (1.499952))

 count3.filter(r => r._1==0).count

      

res28: Long = 500048

 count3.filter(r => r._1==1).count

      

res29: Long = 499952

I think you generate an RDD and then store it, the default cache type is MEMORY_ONLY

. Now the problem is that you can only cache 85% of rdd in memory, which means that the remaining 15% will be recalculated on demand. If you use any random function when generating rdd, that 15% of the data may change during recalculation.

0


source







All Articles