Best way to retrieve and store values ​​with the same keys from multiple RDDs

I created two RDDs in PySpark with data extracted from HBase. I want to collect elements with the same row keys, store the elements, and then search for the values ​​associated with each of the elements. Ideally, I would store the results in a pyspark.sql object, since I want to apply Levenshtein distance to their content.

More details:

In HBase I have location data where the row key is the geohash for the given area and the columns have multiple locations in the area with more details (json with description and other text data) in the location. I have two HBase tables and they can be the same in both cases. I want to search for data in these two RDDs, check for similar geohashes, and store the results in a new data structure.

I don't want to reinvent the wheel and I have just started learning Spark, so I'm wondering: what's the best way to accomplish such a task? Is the built-in rdd.intersection function a good solution?

+3


source to share


2 answers


Edited . In fact, thanks to @Aneel's comments, I was able to fix some of my mistakes. Actually there is a join call to RDD which gives the same thing (the join is done on the first column of the RDD and the values ​​are a tuple of the rest of the columns of both RDDs), since the call from JOIN

with Spark SQL will issue instead of doing cogroup

as stated earlier, since, as @Aneel pointed out cogroup

squash key- a pair of values ​​under one single key.

Now, in another post, I tried @Aneel's methods and above, and try to compare it a bit, here are the results using the community community of the community (very small cluster, 6GB memory, 1 core and Spark 2.1), here is the link . (the code is also at the end of the post)

Here are the results:

  • For a list of 100,000:
    • SQL Sparks: 1.32s
    • RDD join: 0.89s
  • For a list of size 250,000:
    • SQL Sparks: 2.2s
    • RDD join: 2.0s
  • For a list of 500,000:
    • SQL Sparks: 3.6s
    • RDD connection: 4.6s
  • For a list of 1,000,000:
    • SQL Sparks: 7.7s
    • Joining RDD: 10.2s
  • For a list of 10000000 size (here I called timeit to run only 10 tests, or it will work until Christmas. Accuracy decreases of course):
    • SQL Sparks: 57.6s
    • Joining RDD: 89.9s

It actually looks like RDDs are faster than Dataframes for small datasets, but once you hit the threshold (around 250k records) the Dataframes attach to launch faster

Now that @Aneel suggested, remember that I made a pretty basic example and you might need to do some testing on your own dataset and environment (I didn't go further than 10M lines in my 2 lists, it already took 2.6 minutes to initialize).



Initialization code:

#Init code
NUM_TESTS=100
from random import randint
l1 = []
l2 = []

import timeit
for i in xrange(0, 10000000):
  t = (randint(0,2000), randint(0,2000))
  v = randint(0,2000)
  l1.append((t,v))
  if (randint(0,100) > 25): #at least 25% of the keys should be similar
    t = (randint(0,2000), randint(0,2000))
  v = randint(0,2000)
  l2.append((t,v))

rdd1 = sc.parallelize(l1)
rdd2 = sc.parallelize(l2)

      

Spark SQL Test:

#Test Spark SQL    
def callable_ssql_timeit():
  df1 = spark.createDataFrame(rdd1).toDF("id", "val")
  df1.createOrReplaceTempView("table1")
  df2 = spark.createDataFrame(rdd2).toDF("id", "val")
  df2.createOrReplaceTempView("table2")
  query="SELECT * FROM table1 JOIN table2 ON table1.id=table2.id"
  spark.sql(query).count()


print(str(timeit.timeit(callable_ssql_timeit, number=NUM_TESTS)/float(NUM_TESTS)) +  "s")

      

RDD connection test:

#Test RDD join
def callable_rdd_timeit():
  rdd1.join(rdd2).count()
print(str(timeit.timeit(callable_rdd_timeit, number=NUM_TESTS)/float(NUM_TESTS)) + "s")

      

+3


source


Since you want to use pyspark.sql DataFrames, how about converting RDDs to them from the start?

df1 = spark.createDataFrame(rdd1)
df1.createOrReplaceTempView("table1").toDF("geohash", "other", "data", )
df2 = spark.createDataFrame(rdd2)
df2.createOrReplaceTempView("table2").toDF("geohash", "other", "data", "fields")
spark.sql("SELECT * FROM table1 JOIN table2 ON table1.geohash = table2.geohash").show()

      



If you want to work with similar (not identical) geohags, you can register a user-defined function to calculate the distance between them.

+2


source







All Articles