Simple Apache Spark application not working

I have the following problem in Apache Spark Streaming. I rewrote a simple word counting app to see how streaming works, so here's the code:

    SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
    JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(1000));

    // Create a DStream that will connect to hostname:port, like localhost:9999
    JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);

    // Split each line into words
    JavaDStream<String> words = lines.flatMap(
      new FlatMapFunction<String, String>() {
        @Override public Iterable<String> call(String x) {
          return Arrays.asList(x.split(" "));
        }
      });

    // Count each word in each batch
    JavaPairDStream<String, Integer> pairs = words.mapToPair(
      new PairFunction<String, String, Integer>() {
        @Override public Tuple2<String, Integer> call(String s) throws Exception {
          return new Tuple2<String, Integer>(s, 1);
        }
      });



    JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
      new Function2<Integer, Integer, Integer>() {
        @Override public Integer call(Integer i1, Integer i2) throws Exception {
          return i1 + i2;
        }
      });

    // Print the first ten elements of each RDD generated in this DStream to the console
    wordCounts.print();

    jssc.start();              // Start the computation
    jssc.awaitTermination();   // Wait for the computation to terminate

      

When I run this standalone app, the logs write the following lines:

14/10/08 13:16:44 INFO JobScheduler: Finished job streaming job 1412767004000 ms.0 from job set of time 1412767004000 ms
14/10/08 13:16:44 INFO JobScheduler: Total delay: 0.023 s for time 1412767004000 ms (execution: 0.019 s)
14/10/08 13:16:44 INFO ShuffledRDD: Removing RDD 428 from persistence list
14/10/08 13:16:44 INFO BlockManager: Removing RDD 428
14/10/08 13:16:44 INFO MappedRDD: Removing RDD 427 from persistence list
14/10/08 13:16:44 INFO BlockManager: Removing RDD 427
14/10/08 13:16:44 INFO FlatMappedRDD: Removing RDD 426 from persistence list
14/10/08 13:16:44 INFO BlockManager: Removing RDD 426
14/10/08 13:16:44 INFO BlockRDD: Removing RDD 425 from persistence list
14/10/08 13:16:44 INFO SocketInputDStream: Removing blocks of RDD BlockRDD[425] at BlockRDD at ReceiverInputDStream.scala:69 of time 1412767004000 ms
14/10/08 13:16:44 INFO BlockManager: Removing RDD 425
14/10/08 13:16:44 INFO SocketReceiver: Stopped receiving
14/10/08 13:16:44 INFO SocketReceiver: Closed socket to localhost:9999
14/10/08 13:16:44 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Retrying connecting to localhost:9999
14/10/08 13:16:44 INFO ReceiverSupervisorImpl: Stopping receiver with message: Restarting receiver with delay 2000ms: Retrying connecting to localhost:9999: 
14/10/08 13:16:44 INFO ReceiverSupervisorImpl: Called receiver onStop
14/10/08 13:16:44 INFO ReceiverSupervisorImpl: Deregistering receiver 0
14/10/08 13:16:44 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Retrying connecting to localhost:9999
14/10/08 13:16:44 INFO ReceiverSupervisorImpl: Stopped receiver 0
14/10/08 13:16:45 INFO ReceiverTracker: Stream 0 received 0 blocks
14/10/08 13:16:45 INFO JobScheduler: Added jobs for time 1412767005000 ms
14/10/08 13:16:45 INFO JobScheduler: Starting job streaming job 1412767005000 ms.0 from job set of time 1412767005000 ms
14/10/08 13:16:45 INFO SparkContext: Starting job: take at DStream.scala:608
14/10/08 13:16:45 INFO DAGScheduler: Registering RDD 435 (map at MappedDStream.scala:35)
14/10/08 13:16:45 INFO DAGScheduler: Got job 217 (take at DStream.scala:608) with 1 output partitions (allowLocal=true)
14/10/08 13:16:45 INFO DAGScheduler: Final stage: Stage 433(take at DStream.scala:608)
14/10/08 13:16:45 INFO DAGScheduler: Parents of final stage: List(Stage 434)
14/10/08 13:16:45 INFO DAGScheduler: Missing parents: List()
14/10/08 13:16:45 INFO DAGScheduler: Submitting Stage 433 (ShuffledRDD[436] at combineByKey at ShuffledDStream.scala:42), which has no missing parents
14/10/08 13:16:45 INFO MemoryStore: ensureFreeSpace(2256) called with curMem=23776, maxMem=277842493
14/10/08 13:16:45 INFO MemoryStore: Block broadcast_217 stored as values in memory (estimated size 2.2 KB, free 264.9 MB)
14/10/08 13:16:45 INFO DAGScheduler: Submitting 1 missing tasks from Stage 433 (ShuffledRDD[436] at combineByKey at ShuffledDStream.scala:42)
14/10/08 13:16:45 INFO TaskSchedulerImpl: Adding task set 433.0 with 1 tasks
14/10/08 13:16:45 INFO TaskSetManager: Starting task 0.0 in stage 433.0 (TID 217, localhost, PROCESS_LOCAL, 1008 bytes)
14/10/08 13:16:45 INFO Executor: Running task 0.0 in stage 433.0 (TID 217)
14/10/08 13:16:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329
14/10/08 13:16:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks
14/10/08 13:16:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 0 ms
14/10/08 13:16:45 INFO Executor: Finished task 0.0 in stage 433.0 (TID 217). 822 bytes result sent to driver
14/10/08 13:16:45 INFO TaskSetManager: Finished task 0.0 in stage 433.0 (TID 217) in 4 ms on localhost (1/1)
14/10/08 13:16:45 INFO DAGScheduler: Stage 433 (take at DStream.scala:608) finished in 0.006 s
14/10/08 13:16:45 INFO TaskSchedulerImpl: Removed TaskSet 433.0, whose tasks have all completed, from pool 
14/10/08 13:16:45 INFO SparkContext: Job finished: take at DStream.scala:608, took 0.009386933 s
14/10/08 13:16:45 INFO SparkContext: Starting job: take at DStream.scala:608
14/10/08 13:16:45 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 108 is 82 bytes
14/10/08 13:16:45 INFO DAGScheduler: Got job 218 (take at DStream.scala:608) with 1 output partitions (allowLocal=true)
14/10/08 13:16:45 INFO DAGScheduler: Final stage: Stage 435(take at DStream.scala:608)
14/10/08 13:16:45 INFO DAGScheduler: Parents of final stage: List(Stage 436)
14/10/08 13:16:45 INFO DAGScheduler: Missing parents: List()
14/10/08 13:16:45 INFO DAGScheduler: Submitting Stage 435 (ShuffledRDD[436] at combineByKey at ShuffledDStream.scala:42), which has no missing parents
14/10/08 13:16:45 INFO MemoryStore: ensureFreeSpace(2256) called with curMem=26032, maxMem=277842493
14/10/08 13:16:45 INFO MemoryStore: Block broadcast_218 stored as values in memory (estimated size 2.2 KB, free 264.9 MB)
14/10/08 13:16:45 INFO DAGScheduler: Submitting 1 missing tasks from Stage 435 (ShuffledRDD[436] at combineByKey at ShuffledDStream.scala:42)
14/10/08 13:16:45 INFO TaskSchedulerImpl: Adding task set 435.0 with 1 tasks
14/10/08 13:16:45 INFO TaskSetManager: Starting task 0.0 in stage 435.0 (TID 218, localhost, PROCESS_LOCAL, 1008 bytes)
14/10/08 13:16:45 INFO Executor: Running task 0.0 in stage 435.0 (TID 218)
14/10/08 13:16:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329
14/10/08 13:16:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks
14/10/08 13:16:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 1 ms
14/10/08 13:16:45 INFO Executor: Finished task 0.0 in stage 435.0 (TID 218). 822 bytes result sent to driver
14/10/08 13:16:45 INFO TaskSetManager: Finished task 0.0 in stage 435.0 (TID 218) in 3 ms on localhost (1/1)
14/10/08 13:16:45 INFO TaskSchedulerImpl: Removed TaskSet 435.0, whose tasks have all completed, from pool 
14/10/08 13:16:45 INFO DAGScheduler: Stage 435 (take at DStream.scala:608) finished in 0.003 s
14/10/08 13:16:45 INFO SparkContext: Job finished: take at DStream.scala:608, took 0.008348754 s
-------------------------------------------
Time: 1412767005000 ms
-------------------------------------------

      

and in the web interface I see the following screenshot:

enter image description here

obviously netcat -lk 9999 does nothing when I write a few word-words.

Can someone help me figure out how this example works?

thank

+3


source to share


2 answers


As stated in the comment

run

nc -lk 9999 in console 

      

Then run the following command inside spark folder

bin/run-example org.apache.spark.examples.streaming.JavaNetworkWordCount localhost 9999

      



Now add words in the console tab where you run nc

It is working! Life is beautiful!

      

and check the output, in the spark folder

(beautiful!,1)
(working!,1)
(is,2)
(It,1)
(Life,1)

      

If you keep adding, the program will continue to consolidate. Hope it helps

+7


source


You need to open one port for communication on the local system using this command

nc -lk 9999

      



then after spark get all text from console (port 9999) as a stream.

+3


source







All Articles