Stream sync error: Battery must be registered before sending to performer

I am using sparking to do some statistical work. Here is my code:

val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(60))
val eventHubsStream = EventHubsUtils.createUnionStream(streamingContext, eventHubsParameters)//create a stream

val accum =  sparkSession.sparkContext.longAccumulator("Total Count")

eventHubsStream.foreachRDD(rdd => {
    accum.add(rdd.count())
    SavetoStorage(accum); //save to storage
})

      

When I run the above program, I got a runtime error:

"The battery must be registered before shipping to the performer"

I've already registered the battery here:

val accum =  sparkSession.sparkContext.longAccumulator("Total Count")

      

Why am I getting this error?

thank

+3


source to share


1 answer


It can be fixed if you register the battery. In Spark 2.2 the following code works well

    val sc = spark.sparkContext
    sc.register(accum, <Name_of_your_accumulator>);
    ... next actions with accumulator ...

      



I hope it will not be broken in the next releases

+2


source







All Articles