Switching JavaStreamingContext from INITIALIZED to ACTIVE
I am using the example code provided by Spark Streaming "JavaKafkaWordCount.java".
public final class JavaKafkaWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
private JavaKafkaWordCount() {
}
public static void main(String[] args) throws Exception {
if (args.length < 4) {
System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>");
System.exit(1);
}
StreamingExamples.setStreamingLogLevels();
SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
// Create the context with 2 seconds batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
int numThreads = Integer.parseInt(args[3]);
Map<String, Integer> topicMap = new HashMap<>();
String[] topics = args[2].split(",");
for (String topic: topics) {
topicMap.put(topic, numThreads);
}
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
JavaDStream<String> lines = messages.map(Tuple2::_2);
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
.reduceByKey((i1, i2) -> i1 + i2);
wordCounts.print();
jssc.start();
jssc.awaitTermination();
}
}
After the object SparkConf
is created, it creates JavaStreamingContext
. Then it defines all the functions required to execute WordCount and runs JavaStreamingContext
. After that, it doesn't go back to wordCount.print()
, but it keeps on typing. How is this possible? What happens when the JSSC switches from INITIALIZED to ACTIVE? Is it a cycle or what?
source to share
Internally, Spark Streaming uses a scheduler to execute all registered "output operations".
Withdrawal operationsare operations that cause the declared stream conversions to materialize, which are lazy like in Spark. In the specific case, the code in question wordCounts.print();
is such an "output operation" and it will register with the Spark Streaming scheduler causing it to execute every "burst interval". The "burst interval" is determined when the streaming context is created. Going back to the above code, the new JavaStreamingContext(sparkConf, new Duration(2000));
"burst interval" is 2000ms or 2 seconds.
In a nutshell:
Every 2 seconds, Spark Streaming triggers an execution wordCounts.print()
, which in turn materializes the DStream score with data for that interval. The materialization process will apply all defined transformations in the DStream (and the underlying RDD), such as operations map
, flatMap
and mapToPair
in the same code.
source to share