ReduceByKey with byte array as key
I would like to work with RDD pairs Tuple2<byte[], obj>
, but byte[]
the same content are treated as different values ββbecause their original values ββare different.
I haven't seen anyone go through a custom mapping. I could convert byte[]
to String
explicitly encoded, but I'm wondering if there is a more efficient way.
source to share
Custom mappings are not sufficient because Spark uses hashCode
objects to organize keys into sections. (At least the HashPartitioner will do this, you can provide a custom separator that can work with arrays)
Wrapping the array to ensure correct equals
and hashCode
should fix the problem. A lightweight wrapper should do the trick:
class SerByteArr(val bytes: Array[Byte]) extends Serializable {
override val hashCode = bytes.deep.hashCode
override def equals(obj:Any) = obj.isInstanceOf[SerByteArr] && obj.asInstanceOf[SerByteArr].bytes.deep == this.bytes.deep
}
Quick test:
import scala.util.Random val data = (1 to 100000).map(_ => Random.nextInt(100).toString.getBytes("UTF-8")) val rdd = sparkContext.parallelize(data) val byKey = rdd.keyBy(identity) // this won't work b/c the partitioner does not support arrays as keys val grouped = byKey.groupByKey // org.apache.spark.SparkException: Default partitioner cannot partition array keys. // let use the wrapper instead val keyable = rdd.map(elem => new SerByteArr(elem)) val bySerKey = keyable.keyBy(identity) val grouped = bySerKey.groupByKey grouped.count // res14: Long = 100
source to share