Merging RDDs using Scala Apache Spark

I have 2 RDDs.

RDD1: ((String, String), Int)
RDD2: (String, Int)

      

For example:

    RDD1

    ((A, X), 1)
    ((B, X), 2)
    ((A, Y), 2)
    ((C, Y), 3)

    RDD2

    (A, 6)
    (B, 7)
    (C, 8)

Output Expected

    ((A, X), 6)
    ((B, X), 14)
    ((A, Y), 12)
    ((C, Y), 24)

      

In RDD1, the (String, String) combination is unique, and in RDD2 every string key is unique. The A score from RDD2 (6) is multiplied by all the score values ​​of records that have A in their key in RDD1.

14 = 7 * 2
12 = 6 * 2
24 = 8 * 3

      

I wrote the following but gave me an error in case:

val finalRdd = countRdd.join(countfileRdd).map(case (k, (ls, rs)) => (k, (ls * rs)))

      

Can someone help me with this?

+3


source to share


1 answer


Your first RDD does not have the same key type as the second RDD (tuple (A, X) versus A). You must convert it before joining:



val rdd1  = sc.parallelize(List((("A", "X"), 1), (("A", "Y"), 2)))
val rdd2 = sc.parallelize(List(("A", 6)))
val rdd1Transformed = rdd1.map { 
   case ((letter, coord), value) => (letter, (coord, value)) 
}
val result = rdd1Transformed
  .join(rdd2)
  .map { 
    case (letter, ((coord, v1), v2)) => ((letter, coord), v1 * v2) 
  }
result.collect()
res1: Array[((String, String), Int)] = Array(((A,X),6), ((A,Y),12))

      

+3


source







All Articles