How to use mapPartitions in Scala?

I am trying to use mapPartitions

in Scala but I am getting the following error.

[error]  found   : Unit
[error]  required: Iterator[?]
[error] Error occurred in an application involving default arguments.
[error]         rdd.mapPartitions(showParts)

      

I am calling the function mapPartitions

like this.

rdd.mapPartitions(showParts)

      

Where the function showParts

is defined as follows.

def showParts(iter: Iterator[(Long, Array[String])]) = 
{ 
  while (iter.hasNext)
  {
    val cur = iter.next;
    // Do something with cur
  }
}

      

What's the correct way to use it mapPartitions

here?

+3


source to share


2 answers


The problem is that the UDF you pass to mapPartitions

must have a return type Iterator[U]

. Your current code does not return anything and therefore is of type Unit

.

If you want to get empty RDD

after execution mapPartitions

, you can do the following:



def showParts(iter: Iterator[(Long, Array[String])]) = 
{ 
  while (iter.hasNext)
  {
    val cur = iter.next;
    // Do something with cur
  }

  // return Iterator[U]
  Iterator.empty 
}

      

+1


source


You need to return Iterator

from your function showParts

.



def onlyEven(numbers: Iterator[Int]) : Iterator[Int] = 
  numbers.filter(_ % 2 == 0)

def partitionSize(numbers: Iterator[Int]) : Iterator[Int] = 
  Iterator.single(numbers.length)

val rdd = sc.parallelize(0 to 10)
rdd.mapPartitions(onlyEven).collect()
// Array[Int] = Array(0, 2, 4, 6, 8, 10)

rdd.mapPartitions(size).collect()
// Array[Int] = Array(2, 3, 3, 3)

      

+1


source







All Articles