Flink HBase input for machine learning algorithms
I would like to use the Flink-HBase addon to read data, which then serves as input to Flink's machine learning algorithms, SVM and MLR respectively. Right now I am writing the extracted data to a temporary file first and then reading it through the libSVM method, but I think there must be a more complex way.
Do you have a piece of code or an idea how to do this?
source to share
No need to write data to disk and then read with MLUtils.readLibSVM
. The reason is as follows.
MLUtils.readLibSVM
expects a text file in which each line is a sparse function vector with a corresponding label. It uses the following format to represent a pair of label vectors:
<line> .=. <label> <feature>:<value> <feature>:<value> ... <feature>:<value> # <info>
Where <feature>
is the index of the next value
feature in the vector. MLUtils.readLibSVM
can read files with this format and convert each line to an instance LabeledVector
. So you get DataSet[LabeledVector]
after reading libSVM. And this is exactly the input format you need for the predictor SVM
and MultipleLinearRegression
.
However, depending on the data format you get from HBase, you first need to convert the data to the format libSVM
. Otherwise, MLUtils.readLibSVM
it will not be able to read the recorded file. And if you transform data, you can also directly convert your data to DataSet[LabeledVector]
and use it as input to Flink ML algorithms. This avoids unnecessary disk cycle.
If you got from HBase a DataSet[String]
where each line has a format libSVM
(see spec above), you can apply the operation map
in HBase DataSet
with the following map function.
val hbaseInput: DataSet[String] = ... val labelCOODS = hbaseInput.flatMap { line => // remove all comments which start with a '#' val commentFreeLine = line.takeWhile(_ != '#').trim if(commentFreeLine.nonEmpty) { val splits = commentFreeLine.split(' ') val label = splits.head.toDouble val sparseFeatures = splits.tail val coos = sparseFeatures.map { str => val pair = str.split(':') require( pair.length == 2, "Each feature entry has to have the form <feature>:<value>") // libSVM index is 1-based, but we expect it to be 0-based val index = pair(0).toInt - 1 val value = pair(1).toDouble (index, value) } Some((label, coos)) } else { None } // Calculate maximum dimension of vectors val dimensionDS = labelCOODS.map { labelCOO => labelCOO._2.map( _._1 + 1 ).max }.reduce(scala.math.max(_, _)) val labeledVectors: DataSet[LabeledVector] = labelCOODS.map{ new RichMapFunction[(Double, Array[(Int, Double)]), LabeledVector] { var dimension = 0 override def open(configuration: Configuration): Unit = { dimension = getRuntimeContext.getBroadcastVariable(DIMENSION).get(0) } override def map(value: (Double, Array[(Int, Double)])): LabeledVector = { new LabeledVector(value._1, SparseVector.fromCOO(dimension, value._2)) } }}.withBroadcastSet(dimensionDS, DIMENSION)
This will convert your libSVM data to a dataset LabeledVectors
.
source to share