Spark Scala scala.util.control.Exception catching and dropping Not on map

I am writing a Spark application in scala and want to handle a dirty input file.

// CSV file
val raw_data = sc.textFile(...)

val clean_data = raw_data.map(_.split(delimiter))
  .map( r => (r(0), r(1).toDouble)

      

will throw a NumberFormatException when r (1) is not a number. This happens on a small number of lines in the ugly input.

I finally landed on an ugly way to achieve what I need:

import scala.util.control.Exception._

val clean_data = raw_data.map(_.split(delimiter))
  .map( r => (r(0),
        catching(classOf[NumberFormatException]).opt(r(1).toDouble))
  .filter( r => r._2 != None)
  .map( r => (r._1, r._2.get))

      

This leaves me with two questions.

1) What is the best way to just discard the wrong lines in the map?

2) How can I handle catching option types without having to explicitly filter None first and then display and apply the .get function on non-None values?

I tried to apply the .flatMap (identity) step to get rid of Nones, but got the expected exception: TraversableOnce [?].

+3


source to share


1 answer


In Spark collect(pf:PartialFunction)

is the twin brother of scala collections collect

and exists for exactly this purpose: keep those collection elements that are defined in the partial function.

val rawData = sc.textFile(...)

val cleanData = rawData.map(_.split(Delimiter))
             .collect{ case Array(x,y) if (Try(y.toDouble).isSuccess) (x,y.toDouble) }

      

Another option that doesn't evaluate .toDouble

twice would be to use flatMap:



val cleanData = rawData.map(_.split(Delimiter))
                       .flatMap(entry => Try(entry.toDouble).toOption)

      

Note. It's a little confusing in Spark that there is a parameterless method collect

designed to get data from an RDD for a driver.

+4


source







All Articles