Switching JavaStreamingContext from INITIALIZED to ACTIVE

I am using the example code provided by Spark Streaming "JavaKafkaWordCount.java".

  

public final class JavaKafkaWordCount {
  private static final Pattern SPACE = Pattern.compile(" ");

  private JavaKafkaWordCount() {
  }

  public static void main(String[] args) throws Exception {
    if (args.length < 4) {
      System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>");
      System.exit(1);
    }

    StreamingExamples.setStreamingLogLevels();
    SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
    // Create the context with 2 seconds batch size
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));

    int numThreads = Integer.parseInt(args[3]);
    Map<String, Integer> topicMap = new HashMap<>();
    String[] topics = args[2].split(",");
    for (String topic: topics) {
      topicMap.put(topic, numThreads);
    }

    JavaPairReceiverInputDStream<String, String> messages =
            KafkaUtils.createStream(jssc, args[0], args[1], topicMap);

    JavaDStream<String> lines = messages.map(Tuple2::_2);

    JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());

    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
        .reduceByKey((i1, i2) -> i1 + i2);

    wordCounts.print();
    jssc.start();
    jssc.awaitTermination();
  }
}
      

Run codeHide result


After the object SparkConf

is created, it creates JavaStreamingContext

. Then it defines all the functions required to execute WordCount and runs JavaStreamingContext

. After that, it doesn't go back to wordCount.print()

, but it keeps on typing. How is this possible? What happens when the JSSC switches from INITIALIZED to ACTIVE? Is it a cycle or what?

+3


source to share


1 answer


Internally, Spark Streaming uses a scheduler to execute all registered "output operations".

Withdrawal operations

are operations that cause the declared stream conversions to materialize, which are lazy like in Spark. In the specific case, the code in question wordCounts.print();

is such an "output operation" and it will register with the Spark Streaming scheduler causing it to execute every "burst interval". The "burst interval" is determined when the streaming context is created. Going back to the above code, the new JavaStreamingContext(sparkConf, new Duration(2000));

"burst interval" is 2000ms or 2 seconds.



In a nutshell:

Every 2 seconds, Spark Streaming triggers an execution wordCounts.print()

, which in turn materializes the DStream score with data for that interval. The materialization process will apply all defined transformations in the DStream (and the underlying RDD), such as operations map

, flatMap

and mapToPair

in the same code.

+2


source







All Articles