Explode multiple columns in SQL Spark table
This question raised the question:
Split (transpose?) Multiple columns in SQL Spark table
Suppose we have additional columns as shown below:
**userId someString varA varB varC varD**
1 "example1" [0,2,5] [1,2,9] [a,b,c] [red,green,yellow]
2 "example2" [1,20,5] [9,null,6] [d,e,f] [white,black,cyan]
To complete the output as shown below:
userId someString varA varB varC varD
1 "example1" 0 1 a red
1 "example1" 2 2 b green
1 "example1" 5 9 c yellow
2 "example2" 1 9 d white
2 "example2" 20 null e black
2 "example2" 5 6 f Cyan
The answer was to define a udf
as:
val zip = udf((xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys))
and the definitions of "withColumn".
df.withColumn("vars", explode(zip($"varA", $"varB"))).select(
$"userId", $"someString",
$"vars._1".alias("varA"), $"vars._2".alias("varB")).show
If we need to expand on the above answer, with more columns, what is the easiest way to amend the above code. Any help please.
source to share
The zip
udf approach looks ok, but you need to expand if for more collections. Unfortunately there is no really good way to pin 4 Seqs, but this should work:
def assertSameSize(arrs:Seq[_]*) = {
assert(arrs.map(_.size).distinct.size==1,"sizes differ")
}
val zip4 = udf((xa:Seq[Long],xb:Seq[Long],xc:Seq[String],xd:Seq[String]) => {
assertSameSize(xa,xb,xc,xd)
xa.indices.map(i=> (xa(i),xb(i),xc(i),xd(i)))
}
)
source to share
I am assuming the size varA, varB, varC, varD remains the same in your example.
scala> case class Input(user_id : Integer,someString : String, varA : Array[Integer],varB : Array[Integer],varC : Array[String], varD : Array[String])
defined class Input
scala> case class Result(user_id : Integer,someString : String , varA : Integer,varB : Integer,varC : String, varD : String)
defined class Result
scala> val obj1 = Input(1,"example1",Array(0,2,5),Array(1,2,9),Array("a","b","c"),Array("red","green","yellow"))
obj1: Input = Input(1,example1,[Ljava.lang.Integer;@77c43ec2,[Ljava.lang.Integer;@3a332d08,[Ljava.lang.String;@5c1222da,[Ljava.lang.String;@114e051a)
scala> val obj2 = Input(2,"example2",Array(1,20,5),Array(9,null,6),Array("d","e","f"),Array("white","black","cyan"))
obj2: Input = Input(2,example2,[Ljava.lang.Integer;@326db38,[Ljava.lang.Integer;@50914458,[Ljava.lang.String;@339b73ae,[Ljava.lang.String;@1567ee0a)
scala> val input_df = sc.parallelize(Seq(obj1,obj2)).toDS
input_df: org.apache.spark.sql.Dataset[Input] = [user_id: int, someString: string ... 4 more fields]
scala> input_df.show
+-------+----------+----------+------------+---------+--------------------+
|user_id|someString| varA| varB| varC| varD|
+-------+----------+----------+------------+---------+--------------------+
| 1| example1| [0, 2, 5]| [1, 2, 9]|[a, b, c]|[red, green, yellow]|
| 2| example2|[1, 20, 5]|[9, null, 6]|[d, e, f]|[white, black, cyan]|
+-------+----------+----------+------------+---------+--------------------+
scala> def getResult(row : Input) : Iterable[Result] = {
| val user_id = row.user_id
| val someString = row.someString
| val varA = row.varA
| val varB = row.varB
| val varC = row.varC
| val varD = row.varD
| val seq = for( i <- 0 until varA.size) yield {Result(user_id,someString,varA(i),varB(i),varC(i),varD(i))}
| seq.toSeq
| }
getResult: (row: Input)Iterable[Result]
scala> val resdf = input_df.flatMap{row => getResult(row)}
resdf: org.apache.spark.sql.Dataset[Result] = [user_id: int, someString: string ... 4 more fields]
scala> resdf.show
+-------+----------+----+----+----+------+
|user_id|someString|varA|varB|varC| varD|
+-------+----------+----+----+----+------+
| 1| example1| 0| 1| a| red|
| 1| example1| 2| 2| b| green|
| 1| example1| 5| 9| c|yellow|
| 2| example2| 1| 9| d| white|
| 2| example2| 20|null| e| black|
| 2| example2| 5| 6| f| cyan|
+-------+----------+----+----+----+------+
If the size of the varA, varB, varC, or varD columns is different, then these scripts should be processed.
You can iterate over the maximum size and output null values ββif the values ββare not present in any columns by handling exceptions.
source to share