Stream sync error: Battery must be registered before sending to performer
I am using sparking to do some statistical work. Here is my code:
val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(60))
val eventHubsStream = EventHubsUtils.createUnionStream(streamingContext, eventHubsParameters)//create a stream
val accum = sparkSession.sparkContext.longAccumulator("Total Count")
eventHubsStream.foreachRDD(rdd => {
accum.add(rdd.count())
SavetoStorage(accum); //save to storage
})
When I run the above program, I got a runtime error:
"The battery must be registered before shipping to the performer"
I've already registered the battery here:
val accum = sparkSession.sparkContext.longAccumulator("Total Count")
Why am I getting this error?
thank
+3
source to share