How to convert spark RDD to mahout DRM?

I am fetching data from Alluxio to Mahout using sc.textFile (), but this is an RDD spark. My program uses this spark RDD as Mahout DRM, so I need to convert RDD to DRM. This way my current code remains stable.

+3


source to share


2 answers


Apache Mahout DRM can be created with Apache Spark RDD in the following steps:

  • Convert each RDD row to Mahout vector
  • Replace the RDD with the index (and replace it so that the tuple is of the form (Long, Vector)

  • Wrap RDD with DRM.

Consider the following code example:



val rddA = sc.parallelize(Array((1.0, 2.0, 3.0),
            ( 2.0, 3.0, 4.0),
            ( 4.0, 5.0, 6.0)))

val drmRddA: DrmRdd[Long] = rddA.map(a => new DenseVector(a))
                 .zipWithIndex()
                 .map(t => (t._2, t._1))

val drmA = drmWrap(rdd= drmRddA)

      

Source / Further Information / Shameless Self-Esteem (bottom): my blog

+2


source


The main problem with data conversion is often that Mahout uses integers to refer to the row and column numbers of the overall matrix, but the data usually has its own row and column keys, which are invalid row identifiers.

Mahout has a named object IndexedDatasetSpark

that stores IDs in BiMaps

(actually BiDictionaries

) but also creates DRM Mahout. The advantage is that dictionaries convert the integers for rows and columns back to your IDs after doing the math.



If you have RDD [String, String] elements for a matrix, a conversion will do that. If you have an array of strings, you can start there to code your own conversion.

https://github.com/apache/mahout/blob/master/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala#L75

+1


source







All Articles