MapWithState gives java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be started during checkpoint restore

I am facing a spark flow issue where I am trying to use broadcast , mapWithState and checkpoint together in spark mode.

Below is the usage:

  • Since I have to pass the connection object (which is not Serializable) to the executors, I use org.apache.spark.broadcast.Broadcast
  • Since we have to maintain some cached information, I am using stateful streams with mapWithState
  • Also I am using checkpointing of my threading context.

I also need to pass a broadcast connection object to mapWithState to fetch some data from an external source.

The stream works great when the context is re-created. However, when I crash the application and try to recover from a checkpoint, I get a ClassCastException.

I have placed a small piece of code based on example from asyncified.io to reproduce the issue on github :

  • My broadcast logic is yuvalitzchakov.utils.KafkaWriter.scala
  • Dummy application logic yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast.scala

Dummy code snippet:

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark-stateful-example")

...
val prop = new Properties()
...

val config: Config = ConfigFactory.parseString(prop.toString)
val sc = new SparkContext(sparkConf)
val ssc = StreamingContext.getOrCreate(checkpointDir, () =>  {

    println("creating context newly")

    clearCheckpoint(checkpointDir)

    val streamingContext = new StreamingContext(sc, Milliseconds(batchDuration))
    streamingContext.checkpoint(checkpointDir)

    ...
    val kafkaWriter = SparkContext.getOrCreate().broadcast(kafkaErrorWriter)
    ...
    val stateSpec = StateSpec.function((key: Int, value: Option[UserEvent], state: State[UserSession]) =>
        updateUserEvents(key, value, state, kafkaWriter)).timeout(Minutes(jobConfig.getLong("timeoutInMinutes")))

    kafkaTextStream
    .transform(rdd => {
        offsetsQueue.enqueue(rdd.asInstanceOf[HasOffsetRanges].offsetRanges)
        rdd
    })
    .map(deserializeUserEvent)
    .filter(_ != UserEvent.empty)
    .mapWithState(stateSpec)
    .foreachRDD { rdd =>
        ...
        some logic
        ...

    streamingContext
    })
}

ssc.start()
ssc.awaitTermination()


def updateUserEvents(key: Int,
                     value: Option[UserEvent],
                     state: State[UserSession],
                     kafkaWriter: Broadcast[KafkaWriter]): Option[UserSession] = {

    ...
    kafkaWriter.value.someMethodCall()
    ...
}

      

I am getting the following error when

kafkaWriter.value.someMethodCall ()

Performed

:

17/08/01 21:20:38 ERROR Executor: Exception in task 2.0 in stage 3.0 (TID 4)
java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to yuvalitzchakov.utils.KafkaWriter
    at yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast$.updateUserSessions$1(SparkStatefulRunnerWithBroadcast.scala:144)
    at yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast$.updateUserEvents(SparkStatefulRunnerWithBroadcast.scala:150)
    at yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast$$anonfun$2.apply(SparkStatefulRunnerWithBroadcast.scala:78)
    at yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast$$anonfun$2.apply(SparkStatefulRunnerWithBroadcast.scala:77)
    at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
    at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
    at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
    at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
    at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1005)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

      

Basically kafkaWriter is a broadcast variable and kafkaWriter.value should return the passed variable to us, but it returns SerializableCongiguration which doesn't get cast to the desired object

Thanks in advance for your help!

+3


source to share


1 answer


The broadcast variable cannot be used with MapwithState (transform operations in general) if we need to restore from the breakpoint directory in the Spark stream. It can only be used inside egress operations in this case, as it requires the Spark context to lazy initialize the broadcast



class JavaWordBlacklist {

private static volatile Broadcast<List<String>> instance = null;

public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
if (instance == null) {
synchronized (JavaWordBlacklist.class) {
if (instance == null)

{ List<String> wordBlacklist = Arrays.asList("a", "b", "c"); instance = jsc.broadcast(wordBlacklist); }

}
}
return instance;
}
}

class JavaDroppedWordsCounter {

private static volatile LongAccumulator instance = null;

public static LongAccumulator getInstance(JavaSparkContext jsc) {
if (instance == null) {
synchronized (JavaDroppedWordsCounter.class) {
if (instance == null)

{ instance = jsc.sc().longAccumulator("WordsInBlacklistCounter"); }

}
}
return instance;
}
}

wordCounts.foreachRDD((rdd, time) -> {
// Get or register the blacklist Broadcast
Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
// Get or register the droppedWordsCounter Accumulator
LongAccumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
// Use blacklist to drop words and use droppedWordsCounter to count them
String counts = rdd.filter(wordCount -> {
if (blacklist.value().contains(wordCount._1()))

{ droppedWordsCounter.add(wordCount._2()); return false; }

else

{ return true; }

}).collect().toString();
String output = "Counts at time " + time + " " + counts;
}

      

0


source







All Articles