Distinct call on 'pyspark.resultiterable.ResultIterable'
I am writing some spark code and I have an RDD that looks like
[(4, <pyspark.resultiterable.ResultIterable at 0x9d32a4c>),
(1, <pyspark.resultiterable.ResultIterable at 0x9d32cac>),
(5, <pyspark.resultiterable.ResultIterable at 0x9d32bac>),
(2, <pyspark.resultiterable.ResultIterable at 0x9d32acc>)]
What I need to do is call the selection by pyspark.resultiterable.ResultIterable
I tried this
def distinctHost(a, b):
p = sc.parallelize(b)
return (a, p.distinct())
mydata.map(lambda x: distinctHost(*x))
But I am getting the error:
Exception: It seems that you are trying to reference the SparkContext from a broadcast variable, action, or transform. The SparkContext can only be used in the driver, not in the code it runs on workers. For more information, see SPARK-5063.
The error explains that I cannot use sc. But I need to find a way to cover pyspark.resultiterable
. ResultIterable
in the RDD so I can call it great.
source to share
A simple approach is to use sets:
from numpy.random import choice, seed
seed(323)
keys = (4, 1, 5, 2)
hosts = [
u'in24.inetnebr.com',
u'ix-esc-ca2-07.ix.netcom.com',
u'uplherc.upl.com',
u'slppp6.intermind.net',
u'piweba4y.prodigy.com'
]
pairs = sc.parallelize(zip(choice(keys, 20), choice(hosts, 20))).groupByKey()
pairs.map(lambda (k, v): (k, set(v))).take(3)
Result:
[(1, {u'ix-esc-ca2-07.ix.netcom.com', u'slppp6.intermind.net'}),
(2,
{u'in24.inetnebr.com',
u'ix-esc-ca2-07.ix.netcom.com',
u'slppp6.intermind.net',
u'uplherc.upl.com'}),
(4, {u'in24.inetnebr.com', u'piweba4y.prodigy.com', u'uplherc.upl.com'})]
If there is a specific reason to use it rdd.disinct
, you can try something like this:
def distinctHost(pairs, key):
return (pairs
.filter(lambda (k, v): k == key)
.flatMap(lambda (k, v): v)
.distinct())
[(key, distinctHost(pairs, key).collect()) for key in pairs.keys().collect()]
source to share