How to filter an RDD according to a function based on another RDD in Spark?

I am an Apache Spark newbie. I want to filter out all groups whose weight sum is greater than the constant value in the RDD. The weight map is also RDD. Here is a small demo, the groups to filter are stored in "groups", the constant value is 12:

val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g"))
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)))
val wm = weights.toArray.toMap
def isheavy(inp: String): Boolean = {
  val allw = inp.split(",").map(wm(_)).sum
  allw > 12
}
val result = groups.filter(isheavy)

      

When the input is very large,> 10 GB, for example, I always run into the "java heap out of memory" error. I doubted if this caused "weights.toArray.toMap" because it was converting a distributed RDD to a Java object in the JVM. So I tried to filter the RDD directly:

val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g"))
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)))
def isheavy(inp: String): Boolean = {
  val items = inp.split(",")
  val wm = items.map(x => weights.filter(_._1 == x).first._2)
  wm.sum > 12
}
val result = groups.filter(isheavy)

      

When I ran result.collect

after loading this script into the spark shell, I got a "java.lang.NullPointerException" error. Someone told me when an RDD is managed in another RDD there will be a nullpointer exception and suggest that I put the weight in Redis.

So how can I get the "result" without converting the "weight" to a map, or put it in Redis? What if there is a solution for filtering RDDs based on another map RDD without using an external data store service? Thank!

+3


source to share


2 answers


The "java out of memory" error comes from the fact that the spark uses its property spark.default.parallelism

when determining the number of partitions, which the default number of cores has.

// From CoarseGrainedSchedulerBackend.scala

override def defaultParallelism(): Int = {
   conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
}

      

When the input gets large and you have limited memory, you should increase the number of partitions.



You can do something like this:

 val input = List("a,b,c,d", "b,c,e", "a,c,d", "e,g") 
 val splitSize = 10000 // specify some number of elements that fit in memory.

 val numSplits = (input.size / splitSize) + 1 // has to be > 0.
 val groups = sc.parallelize(input, numSplits) // specify the # of splits.

 val weights = Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)).toMap

 def isHeavy(inp: String) = inp.split(",").map(weights(_)).sum > 12
 val result = groups.filter(isHeavy)

      

You can also consider increasing the artist memory size with spark.executor.memory

.

+2


source


Let's assume your group is unique. Otherwise, make it unique, different, etc. first. If the group or weight is small, this should be easy. If both groups and weights are huge you can try this, which might be more scalable but also looks complicated.



val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g"))
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)))
//map groups to be (a, (a,b,c,d)), (b, (a,b,c,d), (c, (a,b,c,d)....
val g1=groups.flatMap(s=>s.split(",").map(x=>(x, Seq(s))))
//j will be (a, ((a,b,c,d),3)...
val j = g1.join(weights)
//k will be ((a,b,c,d), 3), ((a,b,c,d),2) ...
val k = j.map(x=>(x._2._1, x._2._2))
//l will be ((a,b,c,d), (3,2,5,1))...
val l = k.groupByKey()
//filter by sum the 2nd
val m = l.filter(x=>{var sum = 0; x._2.foreach(a=> {sum=sum+a});sum > 12})
//we only need the original list
val result=m.map(x=>x._1)
//don't do this in real product, otherwise, all results go to driver.instead using saveAsTextFile, etc
scala> result.foreach(println)
List(e,g)
List(b,c,e)

      

+4


source







All Articles