Spark: Broadcast Jackson ObjectMapper

I have a spark application that reads lines from files and tries to deserialize them using jackson. For this code to work, I needed to define an ObjectMapper inside the Map operation (otherwise I got a NullPointerException).

I have the following code that works:

val alertsData = sc.textFile(rawlines).map(alertStr => {
      val mapper = new ObjectMapper()
      mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
      mapper.registerModule(DefaultScalaModule)
      broadcastVar.value.readValue(alertStr, classOf[Alert])
    })

      

However, if I define the mapper outside the map and cast it, it fails with a NullPointerException.

This code doesn't work:

val mapper = new ObjectMapper()
    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
    mapper.registerModule(DefaultScalaModule)
    val broadcastVar = sc.broadcast(mapper)

    val alertsData = sc.textFile(rawlines).map(alertStr => {
      broadcastVar.value.readValue(alertStr, classOf[Alert])
    })

      

What am I missing here?

Thanks, Aliza

+2


source to share


2 answers


It turns out you can broadcast the cartographer. The problematic part was mapper.registerModule(DefaultScalaModule)

that had to be done on every slave (executive) computer, not just the driver.

so this code works:

val mapper = new ObjectMapper()
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
val broadcastVar = sc.broadcast(mapper)

val alertsData = sc.textFile(rawlines).map(alertStr => {
      broadcastVar.value.registerModule(DefaultScalaModule)
      broadcastVar.value.readValue(alertStr, classOf[Alert])
})

      



I optimized the code even further by running registerModule only once for each section (not for each item in the RDD).

val mapper = new ObjectMapper()
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)

val broadcastVar = sc.broadcast(mapper)
val alertsRawData = sc.textFile(rawlines)

val alertsData = alertsRawData.mapPartitions({ iter: Iterator[String] => broadcastVar.value.registerModule(DefaultScalaModule)
      for (i <- iter) yield broadcastVar.value.readValue(i, classOf[Alert]) })

      

Aliza

+5


source


Indeed, objectMapper is not suitable for broadcast. Basically it is not serializable, not a value class. I would suggest DeserializationConfig

casting instead and passing that to the ObjectMapper constructor from the braodcast variable in your map operation.



+1


source







All Articles