How can I aggregate a Spark dataframe to get a sparse vector using Scala?
I have a dataframe like the one below in Spark and I want to group it by a column id
and then for each row in the grouped data I need to create a sparse vector with elements from the weight
column with the indices specified by the column index
. The length of the sparse vector is known, say 1000 for this example.
Dataframe df
:
+-----+------+-----+
| id|weight|index|
+-----+------+-----+
|11830| 1| 8|
|11113| 1| 3|
| 1081| 1| 3|
| 2654| 1| 3|
|10633| 1| 3|
|11830| 1| 28|
|11351| 1| 12|
| 2737| 1| 26|
|11113| 3| 2|
| 6590| 1| 2|
+-----+------+-----+
I read this which is similar to what I want to do but for rdd. Does anyone know a good way to do this for a dataframe in Spark using Scala?
My attempt so far is to first collect the weights and indices as lists like this:
val dfWithLists = df
.groupBy("id")
.agg(collect_list("weight") as "weights", collect_list("index") as "indices"))
which looks like this:
+-----+---------+----------+
| id| weights| indices|
+-----+---------+----------+
|11830| [1, 1]| [8, 28]|
|11113| [1, 3]| [3, 2]|
| 1081| [1]| [3]|
| 2654| [1]| [3]|
|10633| [1]| [3]|
|11351| [1]| [12]|
| 2737| [1]| [26]|
| 6590| [1]| [2]|
+-----+---------+----------+
Then I define udf and do something like this:
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.sql.functions.udf
def toSparseVector: ((Array[Int], Array[BigInt]) => Vector) = {(a1, a2) => Vectors.sparse(1000, a1, a2.map(x => x.toDouble))}
val udfToSparseVector = udf(toSparseVector)
val dfWithSparseVector = dfWithLists.withColumn("SparseVector", udfToSparseVector($"indices", $"weights"))
but that doesn't seem to work and it seems like there should be an easier way to do it without having to collect the weights and indices into lists first.
I am new to Spark, Dataframes and Scala so any help is greatly appreciated.
source to share
You need to compile them as vectors need to be local, single machines: https://spark.apache.org/docs/latest/mllib-data-types.html#local-vector
To create sparse vectors you have 2 options, either using unordered pairs (index, value) or specifying arrays of indices and values: https://spark.apache.org/docs/latest/api/scala/index.html#org.apache .spark.mllib.linalg.Vectors $
If you can get the data in a different format (rotation), you can also use VectorAssembler: https://spark.apache.org/docs/latest/ml-features.html#vectorassembler
With some little tweaks, you can make your approach work:
:paste
// Entering paste mode (ctrl-D to finish)
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
val df = Seq((11830,1,8), (11113, 1, 3), (1081, 1,3), (2654, 1, 3), (10633, 1, 3), (11830, 1, 28), (11351, 1, 12), (2737, 1, 26), (11113, 3, 2), (6590, 1, 2)).toDF("id", "weight", "index")
val dfWithFeat = df
.rdd
.map(r => (r.getInt(0), (r.getInt(2), r.getInt(1).toDouble)))
.groupByKey()
.map(r => LabeledPoint(r._1, Vectors.sparse(1000, r._2.toSeq)))
.toDS
dfWithFeat.printSchema
dfWithFeat.show(10, false)
// Exiting paste mode, now interpreting.
root
|-- label: double (nullable = true)
|-- features: vector (nullable = true)
+-------+-----------------------+
|label |features |
+-------+-----------------------+
|11113.0|(1000,[2,3],[3.0,1.0]) |
|2737.0 |(1000,[26],[1.0]) |
|10633.0|(1000,[3],[1.0]) |
|1081.0 |(1000,[3],[1.0]) |
|6590.0 |(1000,[2],[1.0]) |
|11830.0|(1000,[8,28],[1.0,1.0])|
|2654.0 |(1000,[3],[1.0]) |
|11351.0|(1000,[12],[1.0]) |
+-------+-----------------------+
dfWithFeat: org.apache.spark.sql.Dataset[org.apache.spark.mllib.regression.LabeledPoint] = [label: double, features: vector]
source to share