Spark: Broadcast Jackson ObjectMapper
I have a spark application that reads lines from files and tries to deserialize them using jackson. For this code to work, I needed to define an ObjectMapper inside the Map operation (otherwise I got a NullPointerException).
I have the following code that works:
val alertsData = sc.textFile(rawlines).map(alertStr => {
val mapper = new ObjectMapper()
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
mapper.registerModule(DefaultScalaModule)
broadcastVar.value.readValue(alertStr, classOf[Alert])
})
However, if I define the mapper outside the map and cast it, it fails with a NullPointerException.
This code doesn't work:
val mapper = new ObjectMapper()
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
mapper.registerModule(DefaultScalaModule)
val broadcastVar = sc.broadcast(mapper)
val alertsData = sc.textFile(rawlines).map(alertStr => {
broadcastVar.value.readValue(alertStr, classOf[Alert])
})
What am I missing here?
Thanks, Aliza
source to share
It turns out you can broadcast the cartographer. The problematic part was mapper.registerModule(DefaultScalaModule)
that had to be done on every slave (executive) computer, not just the driver.
so this code works:
val mapper = new ObjectMapper()
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
val broadcastVar = sc.broadcast(mapper)
val alertsData = sc.textFile(rawlines).map(alertStr => {
broadcastVar.value.registerModule(DefaultScalaModule)
broadcastVar.value.readValue(alertStr, classOf[Alert])
})
I optimized the code even further by running registerModule only once for each section (not for each item in the RDD).
val mapper = new ObjectMapper()
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
val broadcastVar = sc.broadcast(mapper)
val alertsRawData = sc.textFile(rawlines)
val alertsData = alertsRawData.mapPartitions({ iter: Iterator[String] => broadcastVar.value.registerModule(DefaultScalaModule)
for (i <- iter) yield broadcastVar.value.readValue(i, classOf[Alert]) })
Aliza
source to share