How do I declare a sparse vector in Spark using Scala?

I'm trying to create a sparse vector (class mllib.linalg.Vectors , not the default), but I can't figure out how to use Seq. I have a small test file with 3 numbers / lines that I convert to rdd by splitting the text into doubles and then grouping the lines by their first column.

Test file

1 2 4
1 3 5    
1 4 8    
2 7 5    
2 8 4    
2 9 10

      

code

val data = sc.textFile("/home/savvas/DWDM/test.txt")
val data2 = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble)))
val grouped = data2.groupBy( _(0) )

      

This results in grouped

having these values

(2.0,CompactBuffer([2.0,7.0,5.0], [2.0,8.0,4.0], [2.0,9.0,10.0]))
(1.0,CompactBuffer([1.0,2.0,4.0], [1.0,3.0,5.0], [1.0,4.0,8.0]))

      

But I cannot figure out the next step. I need to take each row grouped

and create a vector for it, so that each row of the new RDD has a vector with a third value CompactBuffer

at the index indicated by the second value. In short, I mean that I need my data in the following example.

[0, 0, 0, 0, 0, 0, 5.0, 4.0, 10.0, 0]
[0, 4.0, 5.0, 8.0, 0, 0, 0, 0, 0, 0]

      

I know I need to use a sparse vector and there are three ways to create it. I tried using Seq with tuple2 (index, value), but I can't figure out how to create such a Seq.

+2


source to share


1 answer


One possible solution is something like below. First lets you convert data to expected types:

import org.apache.spark.rdd.RDD

val pairs: RDD[(Double, (Int, Double))] = data.map(_.split(" ") match {
  case Array(label, idx, value) => (label.toDouble, (idx.toInt, value.toDouble))
})

      

then find the maximum index (size of vectors):

val nCols = pairs.map{case (_, (i, _)) => i}.max + 1

      



group and convert:

import org.apache.spark.mllib.linalg.SparseVector

def makeVector(xs: Iterable[(Int, Double)]) = {
  val (indices, values) = xs.toArray.sortBy(_._1).unzip
  new SparseVector(nCols, indices.toArray, values.toArray)
}

val transformed: RDD[(Double, SparseVector)] = pairs
  .groupByKey
  .mapValues(makeVector)

      

Another way you can handle this, assuming the first elements can be safely converted to and from an integer, is to use CoordinateMatrix

:

import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}

val entries: RDD[MatrixEntry] = data.map(_.split(" ") match {
  case Array(label, idx, value) => 
    MatrixEntry(label.toInt, idx.toInt, value.toDouble)
})

val transformed: RDD[(Double, SparseVector)] = new CoordinateMatrix(entries)
  .toIndexedRowMatrix
  .rows
  .map(row => (row.index.toDouble, row.vector))

      

+2


source







All Articles