Custom Receiver works in Spark Streaming

I am trying to write a Spark Streaming application with a custom receiver. I have to simulate real-time inputs by providing random values ​​at a given interval. The receiver (simplified) looks like this, with the below code for the Spark Streaming application:

class SparkStreamingReceiver extends Actor with ActorHelper {

  private val random = new Random()

  override def preStart = {
    context.system.scheduler.schedule(500 milliseconds, 1000 milliseconds)({
        self ! ("string", random.nextGaussian())
    })
  }

  override def receive = {
    case data: (String, Double) => {
      store[(String, Double)](data)
    }
  }
}

      

val conf: SparkConf = new SparkConf()
conf.setAppName("Spark Streaming App")
    .setMaster("local")

val ssc: StreamingContext = new StreamingContext(conf, Seconds(2))

val randomValues: ReceiverInputDStream[(String, Double)] =
    ssc.actorStream[(String,Double)](Props(new SparkStreamingReceiver()), "Receiver")

randomValues.saveAsTextFiles("<<OUTPUT_PATH>>/randomValues")

      

By running this code, I can see that the receiver is working (Item storage, retrieved single log entries). However, it saveAsTextFiles

will never output values.

I can work around the problem by changing the master to run with two threads ( local[2]

), but if I register another instance of my receiver (which I intend to do), it reappears. More specifically, I need to have at least one stream greater than the number of registered custom receivers to get any result.

It seems to me that the worker threads have slowed down the receivers.

Can anyone explain this effect and maybe fix my code?

+3


source to share


1 answer


Each receiver uses a compute slot. Thus, 2 receivers will require 2 compute slots. If all compute slots are received by the receivers, then there is no slot for data processing. This is why "local" mode with 1 receiver and "local [2]" with 2 receivers stops processing.



+7


source







All Articles