Stopping sparking after reading the first batch of data
I am using sparking to consume kafka posts. I want to get some messages as sample from kafka, not read all messages. So I want to read a batch of messages, return them to the caller, and stop the spark stream from flowing. I am currently missing the batchInterval time while waiting for the Termination method of the intrinsically safe context method. I don't understand how to return the processed data to the caller from the spark stream. Here is my code that I am currently using
def getsample(params: scala.collection.immutable.Map[String, String]): Unit = {
if (params.contains("zookeeperQourum"))
zkQuorum = params.get("zookeeperQourum").get
if (params.contains("userGroup"))
group = params.get("userGroup").get
if (params.contains("topics"))
topics = params.get("topics").get
if (params.contains("numberOfThreads"))
numThreads = params.get("numberOfThreads").get
if (params.contains("sink"))
sink = params.get("sink").get
if (params.contains("batchInterval"))
interval = params.get("batchInterval").get.toInt
val sparkConf = new SparkConf().setAppName("KafkaConsumer").setMaster("spark://cloud2-server:7077")
val ssc = new StreamingContext(sparkConf, Seconds(interval))
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
var consumerConfig = scala.collection.immutable.Map.empty[String, String]
consumerConfig += ("auto.offset.reset" -> "smallest")
consumerConfig += ("zookeeper.connect" -> zkQuorum)
consumerConfig += ("group.id" -> group)
var data = KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](ssc, consumerConfig, topicMap, StorageLevel.MEMORY_ONLY).map(_._2)
val streams = data.window(Seconds(interval), Seconds(interval)).map(x => new String(x))
streams.foreach(rdd => rdd.foreachPartition(itr => {
while (itr.hasNext && size >= 0) {
var msg=itr.next
println(msg)
sample.append(msg)
sample.append("\n")
size -= 1
}
}))
ssc.start()
ssc.awaitTermination(5000)
ssc.stop(true)
}
So instead of storing the messages in a string builder called "sample", I want to go back to the caller.
source to share
We can get sample messages using the following code snippet
var sampleMessages=streams.repartition(1).mapPartitions(x=>x.take(10))
and if we want to stop after the first batch, then we have to implement our own StreamingListener interface and stop streaming in the onBatchCompleted method.
source to share
You can implement a StreamingListener, and then inside it, onBatchCompleted, you can call ssc.stop ()
private class MyJobListener(ssc: StreamingContext) extends StreamingListener {
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) = synchronized {
ssc.stop(true)
}
}
This is how you attach SparkStreaming to the JobListener:
val listen = new MyJobListener(ssc)
ssc.addStreamingListener(listen)
ssc.start()
ssc.awaitTermination()
source to share