ReduceByKey with byte array as key

I would like to work with RDD pairs Tuple2<byte[], obj>

, but byte[]

the same content are treated as different values ​​because their original values ​​are different.

I haven't seen anyone go through a custom mapping. I could convert byte[]

to String

explicitly encoded, but I'm wondering if there is a more efficient way.

+3


source to share


2 answers


Custom mappings are not sufficient because Spark uses hashCode

objects to organize keys into sections. (At least the HashPartitioner will do this, you can provide a custom separator that can work with arrays)

Wrapping the array to ensure correct equals

and hashCode

should fix the problem. A lightweight wrapper should do the trick:

class SerByteArr(val bytes: Array[Byte]) extends Serializable {
    override val hashCode = bytes.deep.hashCode
    override def equals(obj:Any) = obj.isInstanceOf[SerByteArr] && obj.asInstanceOf[SerByteArr].bytes.deep == this.bytes.deep
}

      



Quick test:

import scala.util.Random
val data = (1 to 100000).map(_ => Random.nextInt(100).toString.getBytes("UTF-8"))
val rdd = sparkContext.parallelize(data)
val byKey = rdd.keyBy(identity)
// this won't work b/c the partitioner does not support arrays as keys
val grouped = byKey.groupByKey
// org.apache.spark.SparkException: Default partitioner cannot partition array keys.

// let use the wrapper instead   

val keyable = rdd.map(elem =>  new SerByteArr(elem))
val bySerKey = keyable.keyBy(identity)
val grouped = bySerKey.groupByKey
grouped.count
// res14: Long = 100

      

+5


source


You can create a wrapper class and define your own equality / comparison functions. This is probably a little faster as you don't need to make a copy of the array (although you still have an object selection).



+1


source







All Articles