Spark Scala scala.util.control.Exception catching and dropping Not on map
I am writing a Spark application in scala and want to handle a dirty input file.
// CSV file
val raw_data = sc.textFile(...)
val clean_data = raw_data.map(_.split(delimiter))
.map( r => (r(0), r(1).toDouble)
will throw a NumberFormatException when r (1) is not a number. This happens on a small number of lines in the ugly input.
I finally landed on an ugly way to achieve what I need:
import scala.util.control.Exception._
val clean_data = raw_data.map(_.split(delimiter))
.map( r => (r(0),
catching(classOf[NumberFormatException]).opt(r(1).toDouble))
.filter( r => r._2 != None)
.map( r => (r._1, r._2.get))
This leaves me with two questions.
1) What is the best way to just discard the wrong lines in the map?
2) How can I handle catching option types without having to explicitly filter None first and then display and apply the .get function on non-None values?
I tried to apply the .flatMap (identity) step to get rid of Nones, but got the expected exception: TraversableOnce [?].
source to share
In Spark collect(pf:PartialFunction)
is the twin brother of scala collections collect
and exists for exactly this purpose: keep those collection elements that are defined in the partial function.
val rawData = sc.textFile(...)
val cleanData = rawData.map(_.split(Delimiter))
.collect{ case Array(x,y) if (Try(y.toDouble).isSuccess) (x,y.toDouble) }
Another option that doesn't evaluate .toDouble
twice would be to use flatMap:
val cleanData = rawData.map(_.split(Delimiter))
.flatMap(entry => Try(entry.toDouble).toOption)
Note. It's a little confusing in Spark that there is a parameterless method collect
designed to get data from an RDD for a driver.
source to share