How to use Java 8 Date and Jackson classes with Spark?
I have a Spark 1.4.0 project where I am trying to parse multiple JSON records containing a timestamp field and store it in a ZonedDateTime using Jackson and the JSR-310 module . If I try to run the driver program from the IDE (namely IntelliJ IDEA 14.0) it works correctly, but if I use sbt assembly and spark-submit
then I get the following exception:
15/07/16 14:13:03 ERROR Executor: Exception in task 3.0 in stage 0.0 (TID 3)
java.lang.AbstractMethodError: com.mycompany.input.EventParser$$anonfun$1$$anon$1.com$fasterxml$jackson$module$scala$experimental$ScalaObjectMapper$_setter_$com$fasterxml$jackson$module$scala$experimental$ScalaObjectMapper$$typeCache_$eq(Lorg/spark-project/guava/cache/LoadingCache;)V
at com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper$class.$init$(ScalaObjectMapper.scala:50)
at com.mycompany.input.EventParser$$anonfun$1$$anon$1.<init>(EventParser.scala:27)
at com.mycompany.input.EventParser$$anonfun$1.apply(EventParser.scala:27)
at com.mycompany.input.EventParser$$anonfun$1.apply(EventParser.scala:24)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
I tried multiple build versions, jackson and spark but no luck. I guess it has something to do with a dependency conflict between spark and my project (with the Guava library anyway). Any ideas?
Thank!
EDIT: An example project to reproduce the problem is here .
source to share
I had a similar problem and solved it by changing 2 things:
1) I used ObjectMapper
instead ScalaObjectMapper
as suggested in a comment on this SO question: Error when starting a job on Spark 1.4.0 with Jackson module with ScalaObjectMapper
2) I needed to define a mapping inside a map operation.
val alertsData = sc.textFile(rawlines).map(alertStr => {
val mapper = new ObjectMapper()
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
mapper.registerModule(DefaultScalaModule)
broadcastVar.value.readValue(alertStr, classOf[Alert])
})
If mapper is defined outside, I got a NullPointerException. Also tried to broadcast it and it didn't work.
Also, there is no need to explicitly add jackson as a dependency as spark provides it.
Hope it helps.
Aliza
source to share
One thing that might help is to upgrade to Jackson 2.5. While the Jackson Scala module depended on Guava prior to 2.4, this dependency has been removed from 2.5 (there is a dependency test
for tests, but nothing for execution). This will at least eliminate the transitive dependency conflict.
source to share