Flink HBase input for machine learning algorithms

I would like to use the Flink-HBase addon to read data, which then serves as input to Flink's machine learning algorithms, SVM and MLR respectively. Right now I am writing the extracted data to a temporary file first and then reading it through the libSVM method, but I think there must be a more complex way.

Do you have a piece of code or an idea how to do this?

+3


source to share


1 answer


No need to write data to disk and then read with MLUtils.readLibSVM

. The reason is as follows.

MLUtils.readLibSVM

expects a text file in which each line is a sparse function vector with a corresponding label. It uses the following format to represent a pair of label vectors:

<line> .=. <label> <feature>:<value> <feature>:<value> ... <feature>:<value> # <info>

      

Where <feature>

is the index of the next value

feature in the vector. MLUtils.readLibSVM

can read files with this format and convert each line to an instance LabeledVector

. So you get DataSet[LabeledVector]

after reading libSVM. And this is exactly the input format you need for the predictor SVM

and MultipleLinearRegression

.



However, depending on the data format you get from HBase, you first need to convert the data to the format libSVM

. Otherwise, MLUtils.readLibSVM

it will not be able to read the recorded file. And if you transform data, you can also directly convert your data to DataSet[LabeledVector]

and use it as input to Flink ML algorithms. This avoids unnecessary disk cycle.

If you got from HBase a DataSet[String]

where each line has a format libSVM

(see spec above), you can apply the operation map

in HBase DataSet

with the following map function.

val hbaseInput: DataSet[String] = ...
val labelCOODS = hbaseInput.flatMap {
  line =>
    // remove all comments which start with a '#'
    val commentFreeLine = line.takeWhile(_ != '#').trim

    if(commentFreeLine.nonEmpty) {
      val splits = commentFreeLine.split(' ')
      val label = splits.head.toDouble
      val sparseFeatures = splits.tail
      val coos = sparseFeatures.map {
        str =>
          val pair = str.split(':')
          require(
            pair.length == 2, 
            "Each feature entry has to have the form <feature>:<value>")

          // libSVM index is 1-based, but we expect it to be 0-based
          val index = pair(0).toInt - 1
          val value = pair(1).toDouble

          (index, value)
      }

      Some((label, coos))
    } else {
      None
    }

// Calculate maximum dimension of vectors
val dimensionDS = labelCOODS.map {
  labelCOO =>
    labelCOO._2.map( _._1 + 1 ).max
}.reduce(scala.math.max(_, _))

val labeledVectors: DataSet[LabeledVector] = 
  labelCOODS.map{ new RichMapFunction[(Double, Array[(Int, Double)]), LabeledVector] {
  var dimension = 0

  override def open(configuration: Configuration): Unit = {
    dimension = getRuntimeContext.getBroadcastVariable(DIMENSION).get(0)
  }

  override def map(value: (Double, Array[(Int, Double)])): LabeledVector = {
    new LabeledVector(value._1, SparseVector.fromCOO(dimension, value._2))
  }
}}.withBroadcastSet(dimensionDS, DIMENSION)

      

This will convert your libSVM data to a dataset LabeledVectors

.

+3


source







All Articles