PySpark, Key Intersection

for example, I have two RDDs in PySpark:

((0,0), 1)
((0,1), 2)
((1,0), 3)
((1,1), 4)

      

and the second one is just

((0,1), 3)
((1,1), 0)

      

I want to have an intersection from the first RDD with the second. In fact, the second RDD should act as a mask for the first. The output should be:

((0,1), 2)
((1,1), 4)

      

this means values ​​from the first RDD, but only for keys from the second. The lengths of both RDDs are different.

I have some kind of solution (needs to be proven), but something like this:

rdd3 = rdd1.cartesian(rdd2)
rdd4 = rdd3.filter(lambda((key1, val1), (key2, val2)): key1 == key2)
rdd5 = rdd4.map(lambda((key1, val1), (key2, val2)): (key1, val1))

      

I don't know how effective this solution is. I would like to hear the opinion of experienced Spark programmers ....

+1


source to share


1 answer


Perhaps we shouldn't think of this process as a connection. You really don't want to join two datasets, do you want to subtract one dataset from the other?

I'm going to point out what I am taking from your question

  • You don't care about the values ​​in the second dataset at all.
  • You only want to store the values ​​in the first dataset where the key value pair appears in the second dataset.

Idea 1 : Cogroup (I think the fastest way). It basically calculates the intersection of both datasets.

rdd1 = sc.parallelize([((0,0), 1), ((0,1), 2), ((1,0), 3), ((1,1), 4)])
rdd2 = sc.parallelize([((0,1), 3), ((1,1), 0)])
intersection = rdd1.cogroup(rdd2).filter(lambda x: x[1][0] and x[1][1])
final_rdd = intersection.map(lambda x: (x[0], list(x[1][0]))).map(lambda (x,y): (x, y[0]))

      


Idea 2 : Subtract by Key



rdd1 = sc.parallelize([((0,0), 1), ((0,1), 2), ((1,0), 3), ((1,1), 4)])
rdd2 = sc.parallelize([((0,1), 3), ((1,1), 0)])

unwanted_rows = rdd1.subtractByKey(rdd2)
wanted_rows = rdd1.subtractByKey(unwanted_rows)

      

I'm not 100% sure if this is faster than your method. This requires two operations subtractByKey

, which can be slow. Also, this method does not preserve order (for example, ((0, 1), 2)

even though it is the first in your first dataset, it is second in the final dataset). But I can't imagine that it matters.

As for which is faster, I think it depends on how long your cartel connection takes. Mapping and filtering is generally faster than the shuffle operations required for subtractByKey

, but of course it cartesian

is a time consuming process.

Anyway, I think you can try this method and see if it works for you!


Seats to improve performance, depending on how big your RDDs are.

If rdd1

small enough to be held in main memory, the subtraction process can be sped up if you pass it and then pass rdd2

against it. However, I admit that this is rarely the case.

+2


source







All Articles