How do I concatenate only those rows (from a large table) with keys in the left small table?
I have two data frames, one large, the other small
val small_df = sc.parallelize(List(("Alice", 15), ("Bob", 20)).toDF("name", "age")
val large_df = sc.parallelize(("Bob", 40), ("SomeOne", 50) , ... ).toDF("name", "age")
I want to add these two blocks of data, but only those who have a key in my small table, that is, I want my result to be like this:
List(("Alice", 15), ("Bob", 60))
My first try is to do union
and reduceByKey
, but I can't seem to find a way for union
two tables and keep those keyed rows only in the smaller one.
Is there a way to do something like "left union" or some other way to approach my answer?
source to share
One way to solve this problem would be to do an outer join and then sum the two age columns that arise together. Note that you spark.implicits._
must import for use $
and org.apache.spark.sql.functions.broadcast
for broadcast.
If any of the two data frames contains duplicates (in the name column), the final data block will contain duplicates, which may or may not be what you want. For duplicates in, large_df
they will only appear if small_df
there is a matching name in there, as stated in the question.
As an optimization, since one of the data blocks is small, it can be transferred before connecting to improve performance.
val small_df = sc.parallelize(List(("Alice", 15), ("Bob", 20)).toDF("name", "age")
val large_df = sc.parallelize(("Bob", 40), ("SomeOne", 50)).toDF("name", "age")
val df = large_df.withColumnRenamed("age", "large_age").join(broadcast(small_df), Array("name"), "right_outer")
val df2 = df.withColumn("age", when($"large_age".isNotNull, $"age" + $"large_age").otherwise($"age")).select("name", "age")
df2.show
+-----+----+
| name| age|
+-----+----+
|Alice|15.0|
| Bob|60.0|
+-----+----+
source to share
This should give you what you want:
val existingKeys = small_df.
join(large_df, "name").
select($"name", large_df("age"))
val all = small_df.
union(existingKeys).
groupBy("name").
agg(sum("age") as "age")
scala> all.show
+-----+---+
| name|age|
+-----+---+
| Bob| 60|
|Alice| 15|
+-----+---+
source to share