How do I add the line number to each line?

let's say this is my data:

‘Maps‘ and ‘Reduces‘ are two phases of solving a query in HDFS.
‘Map’ is responsible to read data from input location.
it will generate a key value pair.
that is, an intermediate output in local machine.
’Reducer’ is responsible to process the intermediate.
output received from the mapper and generate the final output.

      

and I want to add a number to each line like below:

1,‘Maps‘ and ‘Reduces‘ are two phases of solving a query in HDFS.
2,‘Map’ is responsible to read data from input location.
3,it will generate a key value pair.
4,that is, an intermediate output in local machine.
5,’Reducer’ is responsible to process the intermediate.
6,output received from the mapper and generate the final output.

      

save them to a file.

I tried:

object DS_E5 {
  def main(args: Array[String]): Unit = {

    var i=0
    val conf = new SparkConf().setAppName("prep").setMaster("local")
    val sc = new SparkContext(conf)
    val sample1 = sc.textFile("data.txt")
    for(sample<-sample1){
      i=i+1
      val ss=sample.map(l=>(i,sample))
      println(ss)
    }
 }
}

      

but its output is like a hollow:

Vector((1,‘Maps‘ and ‘Reduces‘ are two phases of solving a query in HDFS.))
...

      

How can I change my code to generate output like my favorite output?

+3


source to share


2 answers


zipWithIndex

is what you need here. It maps from RDD[T]

to RDD[(T, Long)]

by adding an index at the second position of the pair.

sample1
   .zipWithIndex()
   .map { case (line, i) => i.toString + ", " + line }

      



or using string interpolation (see comment by @ DanielC.Sobral)

sample1
    .zipWithIndex()
    .map { case (line, i) => s"$i, $line" }

      

+5


source


By calling val sample1 = sc.textFile("data.txt")

, you create a new RDD .

If you only want the output, you can try using the following code:

sample1.zipWithIndex().foreach(f => println(f._2 + ", " + f._1))



Basically using this code you will do the following:

  • Using .zipWithIndex()

    will return a new RDD[(T, Long)]

    , which (T, Long)

    is a Tuple ' , T

    it is the datatype element previous RDD ( java.lang.String

    , I believe) and Long

    is the index of the element in the RDD.
  • You have completed the transformation , now you need to take action . foreach

    , in this case, fits very well. What it basically does: It applies your operator to every element in the current RDD, so we just call the fast formatted one println

    .
+2


source







All Articles