Predicting live streaming in Flink using scala
Flink version: 1.2.0
Scala version: 2.11.8
I want to use DataStream to predict using a model in flink with scala. I have a DataStream [String] in flink using scala that contains json formatted data from a kafka source. I want to use this datastream to make predictions on a Flink-ml model that is already trained. The problem is that all flink-ml examples use DataSet api for prediction. I am referring to the new features of flink and scala, so any help in the form of a code solution would be appreciated.
Entrance:
{"FC196":"Dormant","FC174":"Yolo","FC195":"Lol","FC176":"4","FC198":"BANKING","FC175":"ABDULMAJEED","FC197":"2017/04/04","FC178":"1","FC177":"CBS","FC199":"INDIVIDUAL","FC179":"SYSTEM","FC190":"OK","FC192":"osName","FC191":"Completed","FC194":"125","FC193":"7","FC203":"A10SBPUB000000000004439900053570","FC205":"1","FC185":"20","FC184":"Transfer","FC187":"2","FC186":"2121","FC189":"abcdef","FC200":"","FC188":"BR01","FC202":"INDIVIDUAL","FC201":"","FC181":"7:00PM","FC180":"2007/04/01","FC183":"11000000","FC182":"INR"}
Code:
package org.apache.flink.quickstart
//imports
import java.util.Properties
import org.apache.flink.api.scala._
import org.apache.flink.ml.recommendation.ALS
import org.apache.flink.ml.regression.MultipleLinearRegression
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import scala.util.parsing.json.JSON
//kafka consumer imports
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
//kafka json table imports
import org.apache.flink.table.examples.scala.StreamTableExample
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.streaming.connectors.kafka.Kafka09JsonTableSource
import org.apache.flink.api.java.DataSet
//JSon4s imports
import org.json4s.native.JsonMethods
// Case class
case class CC(FC196:String,FC174:String,FC195:String,FC176:String,FC198:String,FC175:String,FC197:String,FC178:String,FC177:String,FC199:String,FC179:String,FC190:String,FC192:String,FC191:String,FC194:String,FC193:String,FC203:String,FC205:String,FC185:String,FC184:String,FC187:String,FC186:String,FC189:String,FC200:String,FC188:String,FC202:String,FC201:String,FC181:String,FC180:String,FC183:String,FC182:String)
object WordCount {
implicit val formats = org.json4s.DefaultFormats
def main(args: Array[String]) {
// set up the execution environment
implicit lazy val formats = org.json4s.DefaultFormats
// kafka properties
val properties = new Properties()
properties.setProperty("bootstrap.servers", "***.**.*.***:9093")
properties.setProperty("zookeeper.connect", "***.**.*.***:2181")
properties.setProperty("group.id","grouop")
properties.setProperty("auto.offset.reset", "earliest")
val env = StreamExecutionEnvironment.getExecutionEnvironment
// val tableEnv = TableEnvironment.getTableEnvironment(env)
val st = env
.addSource(new FlinkKafkaConsumer09("new", new SimpleStringSchema() , properties))
.flatMap(raw => JsonMethods.parse(raw).toOption)
val mapped = st.map(_.extract[CC])
mapped.print()
env.execute()
}
}
source to share
The way to solve this problem is to write MapFunction
which reads the model when it starts working. Then it will MapFunction
save the model as part of the internal state. This way it will be automatically restored in case of a crash:
public static void main(String[] args) throws Exception {
// obtain execution environment, run this example in "ingestion time"
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<Value> input = ...; // read from Kafka for example
DataStream<Prediction> prediction = input.map(new Predictor());
prediction.print();
env.execute();
}
public static class Predictor implements MapFunction<Value, Prediction>, CheckpointedFunction {
private transient ListState<Model> modelState;
private transient Model model;
@Override
public Prediction map(Value value) throws Exception {
return model.predict(value);
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// we don't have to do anything here because we assume the model to be constant
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Model> listStateDescriptor = new ListStateDescriptor<>("model", Model.class);
modelState = context.getOperatorStateStore().getUnionListState(listStateDescriptor);
if (context.isRestored()) {
// restore the model from state
model = modelState.get().iterator().next();
} else {
modelState.clear();
// read the model from somewhere, e.g. read from a file
model = ...;
// update the modelState so that it is checkpointed from now
modelState.add(model);
}
}
}
public static class Model {}
public static class Value{}
public static class Prediction{}
}
source to share