How to flatten tuples in Spark?
I want to flatten the RDD of tuples (using a no-op map), but I get an error like:
val fromTuples = sc.parallelize( List((1,"a"), (2, "b"), (3, "c")) )
val flattened = fromTuples.flatMap(x => x)
println(flattened.collect().toNiceString)
gives
error: type mismatch;
found: (Int, String) required: TraversableOnce [?]
val flattened = fromMap.flatMap (x => x)
Equivalent list List
or Array
works fine, for example:
val fromList = sc.parallelize(List(List(1, 2), List(3, 4)))
val flattened = fromList.flatMap(x => x)
println(flattened.collect().toNiceString)
Can Scala handle this? If not, why not?
source to share
From Luben's comment, this can indeed be done, covertly:
sc.parallelize(List(("a", 1), ("c", 2), ("e", 4))).flatMap(_.productIterator).collect()
Every honor to him. (Though, as Brian points out , this would preserve type safety.)
source to share
Tuples are not collections. Unlike Python, where a tuple is essentially just an immutable list, a tuple in Scala is more like a class (or more like Python namedtuple
). You cannot "flatten" a tuple because it is a heterogeneous group of fields.
You can convert a tuple to something repeatable by calling .productIterator
on it, but you will fall back to Iterable[Any]
. You can, of course, iron out such a thing, but you've lost all compile-time type protection. (Most Scala programmers shudder at the thought of typing Any
.)
source to share
There is no great way, but with this method you can preserve a bit of type safety:
val fromTuples = session.sparkContext.parallelize(List((1, "a"), (2, "b"), (3, "c")))
val flattened = fromTuples.flatMap(t => Seq(t._1, t._2))
println(flattened.collect().mkString)
The flatten type will be RDD
any parent of all types in the tuple. Which, yes, in this case Any
, but if the list was:, List(("1", "a"), ("2", "b"))
it would retain the type String
.
source to share
val fromTuples = sc.parallelize(List((1, "a"), (2, "b"), (3, "c")))
val flattened = fromTuples.flatMap(x => Array(x))
flattened.collect()
The reason for your error is
flatMap (func) Similar to map, but each input element can be mapped to 0 or more output elements (so func should return Seq, not one element).
source to share
As others have said, there is no great way to do this, especially with regard to type safety.
However, if you just want to print RDD
in a nice flat format, you can simply display RDD
and use mkString
:
scala> val myRDD = sc.parallelize( List((1,"a"), (2, "b"), (3, "c")) )
myRDD: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[3] at parallelize at <console>:24
scala> myRDD.map{case (a,b) => s"$a,$b"}.collect.mkString(",")
res0: String = 1,a,2,b,3,c
source to share