Fix hdf entries not working with saveAsNewAPIHadoopFile method
I am using Spark 1.1.0 on CDH 5.2.0 and am trying so that I can read and write to hdfs.
I quickly realized that .textFile and .saveAsTextFile are calling the old api and don't seem to be compatible with our version of hdfs.
def testHDFSReadOld(sc: SparkContext, readFile: String){
//THIS WILL FAIL WITH
//(TID 0, dl1rhd416.internal.edmunds.com): java.lang.IllegalStateException: unread block data
//java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2420)
sc.textFile(readFile).take(2).foreach(println)
}
def testHDFSWriteOld(sc: SparkContext, writeFile: String){
//THIS WILL FAIL WITH
//(TID 0, dl1rhd416.internal.edmunds.com): java.lang.IllegalStateException: unread block data
//java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2420)
sc.parallelize(List("THIS","ISCOOL")).saveAsTextFile(writeFile)
}
Moving to new API methods fixed read from hdfs!
def testHDFSReadNew(sc: SparkContext, readFile: String){
//THIS WORKS
sc.newAPIHadoopFile(readFile, classOf[TextInputFormat], classOf[LongWritable],
classOf[Text],sc.hadoopConfiguration).map{
case (x:LongWritable, y: Text) => y.toString
}.take(2).foreach(println)
}
So it seemed like I was making progress. The email no longer came out with a hard error like above, instead it turned out to be working. The only problem is that there was nothing in the directory besides the single file SUCCESS. Further puzzling was that the logs showed that data was being written to the directory. It just seems like the committer never realized to move the files from the _temporary directory to the output directory.
def testHDFSWriteNew(sc: SparkContext, writeFile: String){
/*This will have an error message of:
INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(dl1rhd400.internal.edmunds.com,35927)
14/11/21 02:02:27 INFO ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@2281f1b2
14/11/21 02:02:27 INFO ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@2281f1b2
java.nio.channels.CancelledKeyException
at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)
However lately it hasn't even had errors, symptoms are no part files in the directory but a success flag is there
*/
val conf = sc.hadoopConfiguration
conf.set("mapreduce.task.files.preserve.failedtasks", "true")
conf.set("mapred.output.dir", writeFile)
sc.parallelize(List("THIS","ISCOOL")).map(x => (NullWritable.get, new Text(x)))
.saveAsNewAPIHadoopFile(writeFile, classOf[NullWritable], classOf[Text], classOf[TextOutputFormat[NullWritable, Text]], conf)
}
When I run locally and specify hdfs paths, the files show up in hdf. This only happens when I start my spark standalone cluster.
I am applying as follows: spark-submit - client -deploy-mode -master spark: // sparkmaster -class driverclass driverjar
source to share
Can you try the following code?
import org.apache.hadoop.io._
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))
nums.saveAsNewAPIHadoopFile[TextOutputFormat[IntWritable, Text]]("/data/newAPIHadoopFile")
The following code also worked for me.
val x = sc.parallelize(List("THIS","ISCOOL")).map(x => (NullWritable.get, new Text(x)))
x.saveAsNewAPIHadoopFile("/data/nullwritable", classOf[NullWritable], classOf[Text], classOf[TextOutputFormat[NullWritable, Text]], sc.hadoopConfiguration)
[root @sparkmaster ~] # hadoop fs -cat / data / nullwritable / *
15/08/20 02:09:19 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
source to share