How do I concatenate only those rows (from a large table) with keys in the left small table?

I have two data frames, one large, the other small

val small_df = sc.parallelize(List(("Alice", 15), ("Bob", 20)).toDF("name", "age")

val large_df = sc.parallelize(("Bob", 40), ("SomeOne", 50) , ... ).toDF("name", "age")

      

I want to add these two blocks of data, but only those who have a key in my small table, that is, I want my result to be like this:

List(("Alice", 15), ("Bob", 60))

      

My first try is to do union

and reduceByKey

, but I can't seem to find a way for union

two tables and keep those keyed rows only in the smaller one.

Is there a way to do something like "left union" or some other way to approach my answer?

+3


source to share


2 answers


One way to solve this problem would be to do an outer join and then sum the two age columns that arise together. Note that you spark.implicits._

must import for use $

and org.apache.spark.sql.functions.broadcast

for broadcast.

If any of the two data frames contains duplicates (in the name column), the final data block will contain duplicates, which may or may not be what you want. For duplicates in, large_df

they will only appear if small_df

there is a matching name in there, as stated in the question.



As an optimization, since one of the data blocks is small, it can be transferred before connecting to improve performance.

val small_df = sc.parallelize(List(("Alice", 15), ("Bob", 20)).toDF("name", "age")
val large_df = sc.parallelize(("Bob", 40), ("SomeOne", 50)).toDF("name", "age")

val df = large_df.withColumnRenamed("age", "large_age").join(broadcast(small_df), Array("name"), "right_outer")
val df2 = df.withColumn("age", when($"large_age".isNotNull, $"age" + $"large_age").otherwise($"age")).select("name", "age")
df2.show

+-----+----+
| name| age|
+-----+----+
|Alice|15.0|
|  Bob|60.0|
+-----+----+

      

+2


source


This should give you what you want:



val existingKeys = small_df.
  join(large_df, "name").
  select($"name", large_df("age"))
val all = small_df.
  union(existingKeys).
  groupBy("name").
  agg(sum("age") as "age")
scala> all.show
+-----+---+
| name|age|
+-----+---+
|  Bob| 60|
|Alice| 15|
+-----+---+

      

+2


source







All Articles