Spark streaming ready
I am learning Spark and trying to create a simple streaming service.
For example, I have a Kafka queue and a Spark job like word count . This example uses stateless mode. I would like to accumulate the word count, so if test
it was sent multiple times in different messages, I could get the total of all its occurrences.
Using other examples like StatefulNetworkWordCount I tried to change my Kafka streaming service
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(2))
ssc.checkpoint("/tmp/data")
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
// Get the lines, split them into words, count the words and print
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))
// Update the cumulative count using mapWithState
// This will give a DStream made of state (which is the cumulative count of the words)
val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
val output = (word, sum)
state.update(sum)
output
}
val stateDstream = wordDstream.mapWithState(
StateSpec.function(mappingFunc) /*.initialState(initialRDD)*/)
stateDstream.print()
stateDstream.map(s => (s._1, s._2.toString)).foreachRDD(rdd => sc.toRedisZSET(rdd, "word_count", 0))
// Start the computation
ssc.start()
ssc.awaitTermination()
I am getting many errors like
17/03/26 21:33:57 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable
org.apache.spark.SparkContext
Serialization stack:
- object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@2b680207)
- field (class: com.DirectKafkaWordCount$$anonfun$main$2, name: sc$1, type: class org.apache.spark.SparkContext)
- object (class com.DirectKafkaWordCount$$anonfun$main$2, <function1>)
- field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, name: cleanedF$1, type: interface scala.Function1)
although the stateless version doesn't work without errors
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(2))
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
// Get the lines, split them into words, count the words and print
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _).map(s => (s._1, s._2.toString))
wordCounts.print()
wordCounts.foreachRDD(rdd => sc.toRedisZSET(rdd, "word_count", 0))
// Start the computation
ssc.start()
ssc.awaitTermination()
The question is how to do stateful word count streaming.
source to share
In this line:
ssc.checkpoint("/tmp/data")
you have included a breakpoint, which means everything in your:
wordCounts.foreachRDD(rdd => sc.toRedisZSET(rdd, "word_count", 0))
must be serializable, but sc
itself is not, as you can see from the error message:
object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@2b680207)
Removing the checkpoint code will help with this.
Another way is to either continuous compute
your DStream
in RDD
, or write data directly to redis, something like:
wordCounts.foreachRDD{rdd =>
rdd.foreachPartition(partition => RedisContext.setZset("word_count", partition, ttl, redisConfig)
}
RedisContext
- a serializable object that does not depend on the SparkContext
source to share