Fix hdf entries not working with saveAsNewAPIHadoopFile method

I am using Spark 1.1.0 on CDH 5.2.0 and am trying so that I can read and write to hdfs.

I quickly realized that .textFile and .saveAsTextFile are calling the old api and don't seem to be compatible with our version of hdfs.

  def testHDFSReadOld(sc: SparkContext, readFile: String){
    //THIS WILL FAIL WITH
    //(TID 0, dl1rhd416.internal.edmunds.com): java.lang.IllegalStateException: unread block data
    //java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2420)

    sc.textFile(readFile).take(2).foreach(println)
  }

  def testHDFSWriteOld(sc: SparkContext, writeFile: String){
    //THIS WILL FAIL WITH
    //(TID 0, dl1rhd416.internal.edmunds.com): java.lang.IllegalStateException: unread block data
    //java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2420)

    sc.parallelize(List("THIS","ISCOOL")).saveAsTextFile(writeFile)
  }

      

Moving to new API methods fixed read from hdfs!

  def testHDFSReadNew(sc: SparkContext, readFile: String){
    //THIS WORKS
    sc.newAPIHadoopFile(readFile, classOf[TextInputFormat], classOf[LongWritable],
      classOf[Text],sc.hadoopConfiguration).map{
      case (x:LongWritable, y: Text) => y.toString
    }.take(2).foreach(println)
  }

      

So it seemed like I was making progress. The email no longer came out with a hard error like above, instead it turned out to be working. The only problem is that there was nothing in the directory besides the single file SUCCESS. Further puzzling was that the logs showed that data was being written to the directory. It just seems like the committer never realized to move the files from the _temporary directory to the output directory.

  def testHDFSWriteNew(sc: SparkContext, writeFile: String){
    /*This will have an error message of:
    INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(dl1rhd400.internal.edmunds.com,35927)
    14/11/21 02:02:27 INFO ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@2281f1b2
      14/11/21 02:02:27 INFO ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@2281f1b2
      java.nio.channels.CancelledKeyException
    at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
    at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)

    However lately it hasn't even had errors, symptoms are no part files in the directory but a success flag is there
    */
    val conf = sc.hadoopConfiguration
    conf.set("mapreduce.task.files.preserve.failedtasks", "true")
    conf.set("mapred.output.dir", writeFile)
    sc.parallelize(List("THIS","ISCOOL")).map(x => (NullWritable.get, new Text(x)))
      .saveAsNewAPIHadoopFile(writeFile, classOf[NullWritable], classOf[Text], classOf[TextOutputFormat[NullWritable, Text]], conf)

  }

      

When I run locally and specify hdfs paths, the files show up in hdf. This only happens when I start my spark standalone cluster.

I am applying as follows: spark-submit - client -deploy-mode -master spark: // sparkmaster -class driverclass driverjar

+3


source to share


1 answer


Can you try the following code?

import org.apache.hadoop.io._
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))
nums.saveAsNewAPIHadoopFile[TextOutputFormat[IntWritable, Text]]("/data/newAPIHadoopFile")

      

The following code also worked for me.



val x = sc.parallelize(List("THIS","ISCOOL")).map(x => (NullWritable.get, new Text(x)))
x.saveAsNewAPIHadoopFile("/data/nullwritable", classOf[NullWritable], classOf[Text], classOf[TextOutputFormat[NullWritable, Text]], sc.hadoopConfiguration)

      

[root @sparkmaster ~] # hadoop fs -cat / data / nullwritable / *

15/08/20 02:09:19 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

      

0


source







All Articles