How to flatten tuples in Spark?

I want to flatten the RDD of tuples (using a no-op map), but I get an error like:

val fromTuples = sc.parallelize( List((1,"a"), (2, "b"), (3, "c")) )
val flattened = fromTuples.flatMap(x => x)
println(flattened.collect().toNiceString)

      

gives

error: type mismatch;

found: (Int, String) required: TraversableOnce [?]

val flattened = fromMap.flatMap (x => x)

Equivalent list List

or Array

works fine, for example:

val fromList = sc.parallelize(List(List(1, 2), List(3, 4)))
val flattened = fromList.flatMap(x => x)
println(flattened.collect().toNiceString)

      

Can Scala handle this? If not, why not?

+3


source to share


5 answers


From Luben's comment, this can indeed be done, covertly:

sc.parallelize(List(("a", 1), ("c", 2), ("e", 4))).flatMap(_.productIterator).collect()

      



Every honor to him. (Though, as Brian points out , this would preserve type safety.)

0


source


Tuples are not collections. Unlike Python, where a tuple is essentially just an immutable list, a tuple in Scala is more like a class (or more like Python namedtuple

). You cannot "flatten" a tuple because it is a heterogeneous group of fields.



You can convert a tuple to something repeatable by calling .productIterator

on it, but you will fall back to Iterable[Any]

. You can, of course, iron out such a thing, but you've lost all compile-time type protection. (Most Scala programmers shudder at the thought of typing Any

.)

+7


source


There is no great way, but with this method you can preserve a bit of type safety:

val fromTuples = session.sparkContext.parallelize(List((1, "a"), (2, "b"), (3, "c")))
val flattened = fromTuples.flatMap(t => Seq(t._1, t._2))
println(flattened.collect().mkString)

      

The flatten type will be RDD

any parent of all types in the tuple. Which, yes, in this case Any

, but if the list was:, List(("1", "a"), ("2", "b"))

it would retain the type String

.

+2


source


  val fromTuples = sc.parallelize(List((1, "a"), (2, "b"), (3, "c")))
  val flattened = fromTuples.flatMap(x => Array(x))
  flattened.collect()

      

The reason for your error is

flatMap (func) Similar to map, but each input element can be mapped to 0 or more output elements (so func should return Seq, not one element).

0


source


As others have said, there is no great way to do this, especially with regard to type safety.

However, if you just want to print RDD

in a nice flat format, you can simply display RDD

and use mkString

:

scala> val myRDD = sc.parallelize( List((1,"a"), (2, "b"), (3, "c")) )
myRDD: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[3] at parallelize at <console>:24

scala> myRDD.map{case (a,b) => s"$a,$b"}.collect.mkString(",")
res0: String = 1,a,2,b,3,c

      

0


source







All Articles