Flink HBase input for machine learning algorithms

I would like to use the Flink-HBase addon to read data, which then serves as input to Flink's machine learning algorithms, SVM and MLR respectively. Right now I am writing the extracted data to a temporary file first and then reading it through the libSVM method, but I think there must be a more complex way.

Do you have a piece of code or an idea how to do this?


source to share

1 answer

No need to write data to disk and then read with MLUtils.readLibSVM

. The reason is as follows.


expects a text file in which each line is a sparse function vector with a corresponding label. It uses the following format to represent a pair of label vectors:

<line> .=. <label> <feature>:<value> <feature>:<value> ... <feature>:<value> # <info>


Where <feature>

is the index of the next value

feature in the vector. MLUtils.readLibSVM

can read files with this format and convert each line to an instance LabeledVector

. So you get DataSet[LabeledVector]

after reading libSVM. And this is exactly the input format you need for the predictor SVM

and MultipleLinearRegression


However, depending on the data format you get from HBase, you first need to convert the data to the format libSVM

. Otherwise, MLUtils.readLibSVM

it will not be able to read the recorded file. And if you transform data, you can also directly convert your data to DataSet[LabeledVector]

and use it as input to Flink ML algorithms. This avoids unnecessary disk cycle.

If you got from HBase a DataSet[String]

where each line has a format libSVM

(see spec above), you can apply the operation map

in HBase DataSet

with the following map function.

val hbaseInput: DataSet[String] = ...
val labelCOODS = hbaseInput.flatMap {
  line =>
    // remove all comments which start with a '#'
    val commentFreeLine = line.takeWhile(_ != '#').trim

    if(commentFreeLine.nonEmpty) {
      val splits = commentFreeLine.split(' ')
      val label = splits.head.toDouble
      val sparseFeatures = splits.tail
      val coos = sparseFeatures.map {
        str =>
          val pair = str.split(':')
            pair.length == 2, 
            "Each feature entry has to have the form <feature>:<value>")

          // libSVM index is 1-based, but we expect it to be 0-based
          val index = pair(0).toInt - 1
          val value = pair(1).toDouble

          (index, value)

      Some((label, coos))
    } else {

// Calculate maximum dimension of vectors
val dimensionDS = labelCOODS.map {
  labelCOO =>
    labelCOO._2.map( _._1 + 1 ).max
}.reduce(scala.math.max(_, _))

val labeledVectors: DataSet[LabeledVector] = 
  labelCOODS.map{ new RichMapFunction[(Double, Array[(Int, Double)]), LabeledVector] {
  var dimension = 0

  override def open(configuration: Configuration): Unit = {
    dimension = getRuntimeContext.getBroadcastVariable(DIMENSION).get(0)

  override def map(value: (Double, Array[(Int, Double)])): LabeledVector = {
    new LabeledVector(value._1, SparseVector.fromCOO(dimension, value._2))
}}.withBroadcastSet(dimensionDS, DIMENSION)


This will convert your libSVM data to a dataset LabeledVectors




All Articles