Fs.rename (new Path (rawFileName), new Path (processFileName)) doesn't work
I'm working on a Scala-based Apache Spark implementation for migrating data from a remote location to HDFS and then swallowing data from HDFS tables to Hive.
Using my first spark assignment, I have included data / files in HDFS at a location, say -
hdfs: //sandbox.hortonworks.com: 8020 / data / analytics / raw / folder
Let's consider that after placing the CT_Click_Basic.csv and CT_Click_Basic1.csv.gz files, I have the following files in HDFS [the file names in the shared folder will have the folder name here, and its contents will be present in the part-xxxxx files]:
[root @sandbox ~] # hdfs dfs -ls / data / analytics / raw / * / Found 3 items
-rw-r - r-- 3 chauhan.bhupesh hdfs 0 2017-07-27 15:02 / data / analytics / raw / CT_Click_Basic.csv / _SUCCESS
-rw-r - r-- 3 chauhan.bhupesh hdfs 8383 2017-07-27 15:02 / data / analytics / raw / CT_Click_Basic.csv / part-00000
-rw-r - r-- 3 chauhan.bhupesh hdfs 8395 2017-07-27 15:02 / data / analytics / raw / CT_Click_Basic.csv / part-00001
Found 2 items
-rw-r - r-- 3 chauhan.bhupesh hdfs 0 2017-07-27 15:02 / data / analytics / raw / CT_Click_Basic1.csv.gz / _SUCCESS
-rw-r - r-- 3 chauhan.bhupesh hdfs 16588 2017-07-27 15:02 / data / analytics / raw / CT_Click_Basic1.csv.gz / part-00000
Now using my other Spark Job, I want to move these files from the / raw folder to / process and then finally to / archive in HDFS based on the tasks performed in each step.
To do this, I first compile a list of all files in the / raw folder using the following code:
def listAllFilesFolderInDir(filePath:String,recursiveTraverse:Boolean,filePaths: ListBuffer[Path]) : ListBuffer[Path] = {
val files = GlobalContext.hdfs.listStatus(new Path(filePath))
files.foreach { fileStatus => {
if(!fileStatus.isDirectory()) {
filePaths+=fileStatus.getPath()
}
else {
listAllFilesFolderInDir(fileStatus.getPath().toString(), recursiveTraverse, filePaths)
}
}
}
filePaths
}
and then using the following line of codes, I am trying to rename / move files in the / raw folder to the / process folder:
var inputDir = "/data/analytics/raw"
var outputDir = "/data/analytics/process"
var filePaths = new ListBuffer[Path]()
var pathArray = listAllFilesFolderInDir(inputDir, true, filePaths)
val fs= <Getting hdfs FileSystem Instance Here>
for(path<-pathArray){
var pathSplit = path.toString().split("/")
var pathSplitSize = pathSplit.size
val rawFileName = inputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
val processFileName = outputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
fs.rename(new Path(rawFileName), new Path(processFileName))
}
But I cannot move / rename these files using the above code. I tried to debug the code and found that fs.rename () returns "false" to me.
Please note, I can achieve file rename / move when copying any file manually to / data / analytics / raw folder ex CT.csv [or any other file] and then running fs.rename (), but it doesn't work for Part- files xxxxx.
Is there something I am missing?
Any quick help would be appreciated.
Regards, Bhupesh
source to share
Finally I have a problem. Actually I tried to rename the file from /data/analytics/raw/folder.csv/part-xxxxx to / data / analytics / process / folder.csv / part-xxxxx where / data / analytics / process is present in HDFS but " folder.csv "was not present; hence it was returning false to me on renaming. I added the following line to my code and it worked fine for me
var inputDir = "/data/analytics/raw"
var outputDir = "/data/analytics/process"
var filePaths = new ListBuffer[Path]()
var pathArray = listAllFilesFolderInDir(inputDir, true, filePaths)
val fs= <Getting hdfs FileSystem Instance Here>
for(path<-pathArray){
var pathSplit = path.toString().split("/")
var pathSplitSize = pathSplit.size
val rawFileName = inputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
var processFolderName = outputDir + "/" + pathSplit(pathSplitSize-2)
var processFolderPath = new Path(processFolderName)
if(!(fs.exists(processFolderPath)))
fs.mkdirs(processFolderPath)
val processFileName = processFolderName + "/" + pathSplit(pathSplitSize-1)
fs.rename(new Path(rawFileName), new Path(processFileName))
}
source to share
Rename may return false if the new Path (rawFileName) does not exist.
Before fs.rename make sure the file exists:
if (fs.exists(somePath)) {
fs.rename...
}
Another reason could be that the file you are trying to rename is in use by someone. Or, if you try to rename a directory, some files in it might be in use by someone. To see if this is the root cause, try renaming the file to something else:
var inputDir = "/data/analytics/raw"
var outputDir = "/data/analytics/process"
var filePaths = new ListBuffer[Path]()
var pathArray = listAllFilesFolderInDir(inputDir, true, filePaths)
val fs= <Getting hdfs FileSystem Instance Here>
for(path<-pathArray){
var pathSplit = path.toString().split("/")
var pathSplitSize = pathSplit.size
val rawFileName = inputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
val processFileName = outputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
fs.rename(new Path("**/TESTDIR1**"), new Path("**/TESTDIR2**"))
}
if this rename is successful, the root cause is indeed in a race condition.
source to share