PySpark, Key Intersection
for example, I have two RDDs in PySpark:
((0,0), 1) ((0,1), 2) ((1,0), 3) ((1,1), 4)
and the second one is just
((0,1), 3) ((1,1), 0)
I want to have an intersection from the first RDD with the second. In fact, the second RDD should act as a mask for the first. The output should be:
((0,1), 2) ((1,1), 4)
this means values ββfrom the first RDD, but only for keys from the second. The lengths of both RDDs are different.
I have some kind of solution (needs to be proven), but something like this:
rdd3 = rdd1.cartesian(rdd2)
rdd4 = rdd3.filter(lambda((key1, val1), (key2, val2)): key1 == key2)
rdd5 = rdd4.map(lambda((key1, val1), (key2, val2)): (key1, val1))
I don't know how effective this solution is. I would like to hear the opinion of experienced Spark programmers ....
source to share
Perhaps we shouldn't think of this process as a connection. You really don't want to join two datasets, do you want to subtract one dataset from the other?
I'm going to point out what I am taking from your question
- You don't care about the values ββin the second dataset at all.
- You only want to store the values ββin the first dataset where the key value pair appears in the second dataset.
Idea 1 : Cogroup (I think the fastest way). It basically calculates the intersection of both datasets.
rdd1 = sc.parallelize([((0,0), 1), ((0,1), 2), ((1,0), 3), ((1,1), 4)])
rdd2 = sc.parallelize([((0,1), 3), ((1,1), 0)])
intersection = rdd1.cogroup(rdd2).filter(lambda x: x[1][0] and x[1][1])
final_rdd = intersection.map(lambda x: (x[0], list(x[1][0]))).map(lambda (x,y): (x, y[0]))
Idea 2 : Subtract by Key
rdd1 = sc.parallelize([((0,0), 1), ((0,1), 2), ((1,0), 3), ((1,1), 4)])
rdd2 = sc.parallelize([((0,1), 3), ((1,1), 0)])
unwanted_rows = rdd1.subtractByKey(rdd2)
wanted_rows = rdd1.subtractByKey(unwanted_rows)
I'm not 100% sure if this is faster than your method. This requires two operations subtractByKey
, which can be slow. Also, this method does not preserve order (for example, ((0, 1), 2)
even though it is the first in your first dataset, it is second in the final dataset). But I can't imagine that it matters.
As for which is faster, I think it depends on how long your cartel connection takes. Mapping and filtering is generally faster than the shuffle operations required for subtractByKey
, but of course it cartesian
is a time consuming process.
Anyway, I think you can try this method and see if it works for you!
Seats to improve performance, depending on how big your RDDs are.
If rdd1
small enough to be held in main memory, the subtraction process can be sped up if you pass it and then pass rdd2
against it. However, I admit that this is rarely the case.
source to share