Scala Line Level Error Handling Spark

I'm having trouble figuring out how to do row level error handling with Scala Spark. In the code below I am reading a CSV text file, parsing it and creating a string using the mapSchema method (not shown), it basically takes an array of strings that are the result of CSV and uses a schema to convert the string to ints, paired, dates etc.). It works great when the data is fully formatted appropriately. However, if I have a bad string - for example, with fewer fields than expected - I want to do some error handling.

val rddFull = sqlContext.sparkContext.textFile(csvPath).map {
  case(txt) =>
    try {
      val reader = new CSVReader(new StringReader(txt), delimiter, quote, escape, headerLines)
      val parsedRow = reader.readNext()
      Row(mapSchema(parsedRow, schema) : _*)
    } catch {
      case err: Throwable =>
        println("a record had an error: "+ txt)
        throw new RuntimeException("SomeError")
    }

      

The problem is the try / catch expressions don't work. When I give it a bad string, I never get a RuntimeException "SomeError". Instead, I get the same error I get when I don't use try / catch.

Any ideas on what might be wrong here?

+3


source to share


1 answer


You need to look for the right place for the magazines. For starters, the trick works. Here's an example from a spark shell:

val d = sc.parallelize(0 until 10)
val e = d.map{ n =>
  try {
   if (n % 3==0) throw new IllegalArgumentException("That was a bad call")
   println(n)
 } catch {
    case e:  IllegalArgumentException =>  throw new UnsupportedOperationException("converted from Arg to Op except")
 }
}
e.collect

      

Here is the result: notice that the exception was correctly caught and converted:



org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in
stage 0.0 failed 1 times, most recent failure: Lost task 5.0 in   
stage 0.0 (TID 5, localhost): 
java.lang.UnsupportedOperationException: converted from Arg to Op except
    at $anonfun$1.apply$mcVI$sp(<console>:29)
    at $anonfun$1.apply(<console>:24)
    at $anonfun$1.apply(<console>:24)

      

Try looking for logs from stderr

one or more workers.

+3


source







All Articles