How is groupByKey RDD, with DenseVector as key, in Spark?
I have created an RDD with each element being a key value pair, with the key having a value DenseVector
and the value being int
. eg
[(DenseVector([3,4]),10), (DenseVector([3,4]),20)]
Now I want to group the key k1
: DenseVector([3,4])
. I expect the behavior to group all the key values k1
that are 10
and 20
. But I get the result
[(DenseVector([3,4]), 10), (DenseVector([3,4]), 20)]
instead
[(DenseVector([3,4]), [10,20])]
Please let me know if I am missing something.
Code for it:
#simplified version of code
#rdd1 is an rdd containing [(DenseVector([3,4]),10), (DenseVector([3,4]),20)]
rdd1.groupByKey().map(lambda x : (x[0], list(x[1])))
print(rdd1.collect())
source to share
Well this is a tricky question and a short answer, you can't. To understand why you need to delve deeper into the implementation DenseVector
. DenseVector
is just a wrapper around NumPyfloat64
ndarray
>>> dv1 = DenseVector([3.0, 4.0])
>>> type(dv1.array)
<type 'numpy.ndarray'>
>>> dv1.array.dtype
dtype('float64')
Since NumPy ndarrays
, by contrast, DenseVector
are mutable, you cannot hash in a meaningful way, although the method is interesting __hash__
. There is an interesting question that covers this question (see numpy ndarray hashability ).
>>> dv1.array.__hash__() is None
False
>>> hash(dv1.array)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: unhashable type: 'numpy.ndarray'
DenseVector
inherits a method __hash__
from object
and is simply based on id
(memory address for a given instance):
>>> id(d1) / 16 == hash(d1)
True
Unfortunately, this means that two DenseVectors
with the same content have different hashes:
>>> dv2 = DenseVector([3.0, 4.0])
>>> hash(dv1) == hash(dv2)
False
What can you do? The simplest is to use an immutable data structure that provides a consistent implementation hash
, such as tuple:
rdd.groupBy(lambda (k, v): tuple(k))
Note . In practice, using arrays as a key is most likely bad. With a lot of elements, the hashing process can be very costly to be useful. That said, if you really want something like this, Scala works fine:
import org.apache.spark.mllib.linalg.Vectors
val rdd = sc.parallelize(
(Vectors.dense(3, 4), 10) :: (Vectors.dense(3, 4), 20) :: Nil)
rdd.groupByKey.collect
source to share