Fs.rename (new Path (rawFileName), new Path (processFileName)) doesn't work

I'm working on a Scala-based Apache Spark implementation for migrating data from a remote location to HDFS and then swallowing data from HDFS tables to Hive.

Using my first spark assignment, I have included data / files in HDFS at a location, say -

hdfs: //sandbox.hortonworks.com: 8020 / data / analytics / raw / folder

Let's consider that after placing the CT_Click_Basic.csv and CT_Click_Basic1.csv.gz files, I have the following files in HDFS [the file names in the shared folder will have the folder name here, and its contents will be present in the part-xxxxx files]:

[root @sandbox ~] # hdfs dfs -ls / data / analytics / raw / * / Found 3 items

-rw-r - r-- 3 chauhan.bhupesh hdfs 0 2017-07-27 15:02 / data / analytics / raw / CT_Click_Basic.csv / _SUCCESS

-rw-r - r-- 3 chauhan.bhupesh hdfs 8383 2017-07-27 15:02 / data / analytics / raw / CT_Click_Basic.csv / part-00000

-rw-r - r-- 3 chauhan.bhupesh hdfs 8395 2017-07-27 15:02 / data / analytics / raw / CT_Click_Basic.csv / part-00001

Found 2 items

-rw-r - r-- 3 chauhan.bhupesh hdfs 0 2017-07-27 15:02 / data / analytics / raw / CT_Click_Basic1.csv.gz / _SUCCESS

-rw-r - r-- 3 chauhan.bhupesh hdfs 16588 2017-07-27 15:02 / data / analytics / raw / CT_Click_Basic1.csv.gz / part-00000

Now using my other Spark Job, I want to move these files from the / raw folder to / process and then finally to / archive in HDFS based on the tasks performed in each step.

To do this, I first compile a list of all files in the / raw folder using the following code:

    def listAllFilesFolderInDir(filePath:String,recursiveTraverse:Boolean,filePaths: ListBuffer[Path]) : ListBuffer[Path] = {
val files = GlobalContext.hdfs.listStatus(new Path(filePath))
files.foreach { fileStatus => {
           if(!fileStatus.isDirectory()) {
                filePaths+=fileStatus.getPath()      
            }
            else {
                listAllFilesFolderInDir(fileStatus.getPath().toString(), recursiveTraverse, filePaths)
            }
        }
  }   
  filePaths
}

      

and then using the following line of codes, I am trying to rename / move files in the / raw folder to the / process folder:

var inputDir = "/data/analytics/raw"
var outputDir = "/data/analytics/process"
var filePaths = new ListBuffer[Path]()
var pathArray = listAllFilesFolderInDir(inputDir, true, filePaths)
val fs= <Getting hdfs FileSystem Instance Here>
for(path<-pathArray){
   var pathSplit = path.toString().split("/")
   var pathSplitSize = pathSplit.size
   val rawFileName = inputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
   val processFileName = outputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
   fs.rename(new Path(rawFileName), new Path(processFileName))
 }

      

But I cannot move / rename these files using the above code. I tried to debug the code and found that fs.rename () returns "false" to me.

Please note, I can achieve file rename / move when copying any file manually to / data / analytics / raw folder ex CT.csv [or any other file] and then running fs.rename (), but it doesn't work for Part- files xxxxx.

Is there something I am missing?

Any quick help would be appreciated.

Regards, Bhupesh

+3


source to share


2 answers


Finally I have a problem. Actually I tried to rename the file from /data/analytics/raw/folder.csv/part-xxxxx to / data / analytics / process / folder.csv / part-xxxxx where / data / analytics / process is present in HDFS but " folder.csv "was not present; hence it was returning false to me on renaming. I added the following line to my code and it worked fine for me



var inputDir = "/data/analytics/raw"
var outputDir = "/data/analytics/process"
var filePaths = new ListBuffer[Path]()
var pathArray = listAllFilesFolderInDir(inputDir, true, filePaths)
val fs= <Getting hdfs FileSystem Instance Here>
for(path<-pathArray){
   var pathSplit = path.toString().split("/")
   var pathSplitSize = pathSplit.size

   val rawFileName = inputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)

   var processFolderName = outputDir + "/" + pathSplit(pathSplitSize-2)
   var processFolderPath = new Path(processFolderName)
   if(!(fs.exists(processFolderPath)))
         fs.mkdirs(processFolderPath)
   val processFileName = processFolderName + "/" + pathSplit(pathSplitSize-1)
   fs.rename(new Path(rawFileName), new Path(processFileName))
 }

      

+1


source


Rename may return false if the new Path (rawFileName) does not exist.
Before fs.rename make sure the file exists:

if (fs.exists(somePath)) {
 fs.rename...
}

      

Another reason could be that the file you are trying to rename is in use by someone. Or, if you try to rename a directory, some files in it might be in use by someone. To see if this is the root cause, try renaming the file to something else:



var inputDir = "/data/analytics/raw"
var outputDir = "/data/analytics/process"
var filePaths = new ListBuffer[Path]()
var pathArray = listAllFilesFolderInDir(inputDir, true, filePaths)
val fs= <Getting hdfs FileSystem Instance Here>
for(path<-pathArray){
   var pathSplit = path.toString().split("/")
   var pathSplitSize = pathSplit.size
   val rawFileName = inputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
   val processFileName = outputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
   fs.rename(new Path("**/TESTDIR1**"), new Path("**/TESTDIR2**"))
 }

      

if this rename is successful, the root cause is indeed in a race condition.

0


source







All Articles