Spark DataFrame for RDD and back
I am writing Apache Spark Application using Scala. I use DataFrames to process and store data. I have a nice pipeline with feature extraction and MultiLayerPerceptron classifier using the ML API.
I also want to use SVM (for comparison purposes). The point is (and correct me if I'm wrong) only MLLib provides SVMs. And MLLib is not ready to handle DataFrames, only RDDs.
So, I figured I could maintain the core of my application using DataFrames and use SVMs. 1) I am just converting the DataFrame columns I need to, RDD[LabeledPoint]
and 2) after classification, add the SVM prediction to the DataFrame as a new column.
The first part I was handling with a small function:
private def dataFrameToRDD(dataFrame : DataFrame) : RDD[LabeledPoint] = {
val rddMl = dataFrame.select("label", "features").rdd.map(r => (r.getInt(0).toDouble, r.getAs[org.apache.spark.ml.linalg.SparseVector](1)))
rddMl.map(r => new LabeledPoint(r._1, Vectors.dense(r._2.toArray)))
}
I need to specify and convert the type of the vector as the object retrieval method uses the ML API and not MLLib.
This RDD[LabeledPoint]
is then fed into the SVM and the classification goes smoothly, no problem. At the end and after the spark example I get RDD[Double]
:
val predictions = rdd.map(point => model.predict(point.features))
Now I want to add the forecast score as a column to the original DataFrame and return it. This is where I am stuck. I can convert RDD[Double]
to DataFrame using
(sql context ommited)
import sqlContext.implicits._
val plDF = predictions.toDF("prediction")
But how do I join two DataFrames where the second DataFrame becomes the original column? I tried using methods join
and union
but got SQL exceptions as there are no equal columns in DataFrames to join or join.
EDIT I tried
data.withColumn("prediction", plDF.col("prediction"))
But I am getting parsing exception :(
source to share
I didn't figure out how to do this without going back to RDD, but anyway, how I solved it with RDD. Added the rest of the code so anyone can understand the full logic. Any suggestions are greatly appreciated.
package stuff
import java.util.logging.{Level, Logger}
import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
/**
* Created by camandros on 10-03-2017.
*/
class LinearSVMClassifier extends Classifier with Serializable{
@transient lazy val log: Logger = Logger.getLogger(getClass.getName)
private var model : SVMModel = _
override def train(data : DataFrame): Unit = {
val rdd = dataFrameToRDD(data)
// Run training algorithm to build the model
val numIter : Int = 100
val step = Osint.properties(Osint.SVM_STEPSIZE).toDouble
val c = Osint.properties(Osint.SVM_C).toDouble
log.log(Level.INFO, "Initiating SVM training with parameters: C="+c+", step="+step)
model = SVMWithSGD.train(rdd, numIterations = numIter, stepSize = step, regParam = c)
log.log(Level.INFO, "Model training finished")
// Clear the default threshold.
model.clearThreshold()
}
override def classify(data : DataFrame): DataFrame = {
log.log(Level.INFO, "Converting DataFrame to RDD")
val rdd = dataFrameToRDD(data)
log.log(Level.INFO, "Conversion finished; beginning classification")
// Compute raw scores on the test set.
val predictions = rdd.map(point => model.predict(point.features))
log.log(Level.INFO, "Classification finished; Transforming RDD to DataFrame")
val sqlContext : SQLContext = Osint.spark.sqlContext
val tupleRDD = data.rdd.zip(predictions).map(t => Row.fromSeq(t._1.toSeq ++ Seq(t._2)))
sqlContext.createDataFrame(tupleRDD, data.schema.add("predictions", "Double"))
//TODO this should work it doesn't since this "withColumn" method seems to be applicable only to add
// new columns using information from the same dataframe; therefore I am using the horrible rdd conversion
//val sqlContext : SQLContext = Osint.spark.sqlContext
//import sqlContext.implicits._
//val plDF = predictions.toDF("predictions")
//data.withColumn("prediction", plDF.col("predictions"))
}
private def dataFrameToRDD(dataFrame : DataFrame) : RDD[LabeledPoint] = {
val rddMl = dataFrame.select("label", "features").rdd.map(r => (r.getInt(0).toDouble, r.getAs[org.apache.spark.ml.linalg.SparseVector](1)))
rddMl.map(r => new LabeledPoint(r._1, Vectors.dense(r._2.toArray)))
}
}
source to share