How is groupByKey RDD, with DenseVector as key, in Spark?

I have created an RDD with each element being a key value pair, with the key having a value DenseVector

and the value being int

. eg

[(DenseVector([3,4]),10),  (DenseVector([3,4]),20)]

      

Now I want to group the key k1

: DenseVector([3,4])

. I expect the behavior to group all the key values k1

that are 10

and 20

. But I get the result

[(DenseVector([3,4]), 10), (DenseVector([3,4]), 20)] 

      

instead

[(DenseVector([3,4]), [10,20])]

      

Please let me know if I am missing something.

Code for it:

#simplified version of code
#rdd1 is an rdd containing [(DenseVector([3,4]),10),  (DenseVector([3,4]),20)]
rdd1.groupByKey().map(lambda x : (x[0], list(x[1])))
print(rdd1.collect())

      

+3


source to share


1 answer


Well this is a tricky question and a short answer, you can't. To understand why you need to delve deeper into the implementation DenseVector

. DenseVector

is just a wrapper around NumPyfloat64

ndarray

>>> dv1 = DenseVector([3.0, 4.0])
>>> type(dv1.array)
<type 'numpy.ndarray'>
>>> dv1.array.dtype
dtype('float64')

      

Since NumPy ndarrays

, by contrast, DenseVector

are mutable, you cannot hash in a meaningful way, although the method is interesting __hash__

. There is an interesting question that covers this question (see numpy ndarray hashability ).

>>> dv1.array.__hash__() is None
False
>>> hash(dv1.array)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: unhashable type: 'numpy.ndarray'

      

DenseVector

inherits a method __hash__

from object

and is simply based on id

(memory address for a given instance):

>>> id(d1) / 16 == hash(d1)
True

      



Unfortunately, this means that two DenseVectors

with the same content have different hashes:

>>> dv2 = DenseVector([3.0, 4.0])
>>> hash(dv1) == hash(dv2)
False

      

What can you do? The simplest is to use an immutable data structure that provides a consistent implementation hash

, such as tuple:

rdd.groupBy(lambda (k, v): tuple(k))

      

Note . In practice, using arrays as a key is most likely bad. With a lot of elements, the hashing process can be very costly to be useful. That said, if you really want something like this, Scala works fine:

import org.apache.spark.mllib.linalg.Vectors

val rdd = sc.parallelize(
    (Vectors.dense(3, 4), 10) :: (Vectors.dense(3, 4), 20) :: Nil)
rdd.groupByKey.collect

      

+3


source







All Articles