How to use Java 8 Date and Jackson classes with Spark?

I have a Spark 1.4.0 project where I am trying to parse multiple JSON records containing a timestamp field and store it in a ZonedDateTime using Jackson and the JSR-310 module . If I try to run the driver program from the IDE (namely IntelliJ IDEA 14.0) it works correctly, but if I use sbt assembly and spark-submit

then I get the following exception:

15/07/16 14:13:03 ERROR Executor: Exception in task 3.0 in stage 0.0 (TID 3)
java.lang.AbstractMethodError: com.mycompany.input.EventParser$$anonfun$1$$anon$1.com$fasterxml$jackson$module$scala$experimental$ScalaObjectMapper$_setter_$com$fasterxml$jackson$module$scala$experimental$ScalaObjectMapper$$typeCache_$eq(Lorg/spark-project/guava/cache/LoadingCache;)V
    at com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper$class.$init$(ScalaObjectMapper.scala:50)
    at com.mycompany.input.EventParser$$anonfun$1$$anon$1.<init>(EventParser.scala:27)
    at com.mycompany.input.EventParser$$anonfun$1.apply(EventParser.scala:27)
    at com.mycompany.input.EventParser$$anonfun$1.apply(EventParser.scala:24)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

      

I tried multiple build versions, jackson and spark but no luck. I guess it has something to do with a dependency conflict between spark and my project (with the Guava library anyway). Any ideas?

Thank!

EDIT: An example project to reproduce the problem is here .

+3


source to share


2 answers


I had a similar problem and solved it by changing 2 things:

1) I used ObjectMapper

instead ScalaObjectMapper

as suggested in a comment on this SO question: Error when starting a job on Spark 1.4.0 with Jackson module with ScalaObjectMapper

2) I needed to define a mapping inside a map operation.

val alertsData = sc.textFile(rawlines).map(alertStr => {
      val mapper = new ObjectMapper()
      mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
      mapper.registerModule(DefaultScalaModule)
      broadcastVar.value.readValue(alertStr, classOf[Alert])
    })

      



If mapper is defined outside, I got a NullPointerException. Also tried to broadcast it and it didn't work.

Also, there is no need to explicitly add jackson as a dependency as spark provides it.

Hope it helps.

Aliza

+5


source


One thing that might help is to upgrade to Jackson 2.5. While the Jackson Scala module depended on Guava prior to 2.4, this dependency has been removed from 2.5 (there is a dependency test

for tests, but nothing for execution). This will at least eliminate the transitive dependency conflict.



+1


source







All Articles