PySpark variable variable

I am making a connection and I have data on over 100 nodes. So I have a small list of keys / values ​​that I am connecting to with another key / value pair.

My list looks like this:

[[1, 0], [2, 0], [3, 0], [4, 0], [5, 0], [6, 0], [7, 0], [8, 0], [9, 0], [10, 0], [11, 0], [16, 0], [18, 0], [19, 0], [20, 0], [21, 0], [22, 0], [23, 0], [24, 0], [25, 0], [26, 0], [27, 0], [28, 0], [29, 0], [36, 0], [37, 0], [38, 0], [39, 0], [40, 0], [41, 0], [42, 0], [44, 0], [46, 0]]

      

I have a broadcast variable:

numB = sc.broadcast(numValuesKV)

      

When I join:

numRDD = columnRDD.join(numB.value)

      

I am getting the following error:

AttributeError: 'list' object has no attribute 'map'

      

+3


source to share


3 answers


you are passing in a list, which is absolutely fine.

what you need to do is



b=sc.broadcast(lst)
rdd.map(lambda t: t if t[0] in b.value)

      

here t [0] should look like [1,0] and so on. But I hope you have an idea ....

+2


source


You can try making numValuesKV a dictionary and see if it works.



+1


source


rdd.join(other)

means concatenation of two RDDs, so it expects to other

be an RDD. To use the effective small table join trick, you need to do the join manually. In Scala it would look like this:

rdd.mapPartitions{iter =>
    val valueMap = numB.value.toMap
    iter.map{case (k,v) => (k,(v,map(v))}
}

      

This applies the join using the broadcast value for each RDD section in a distributed fashion.

PySpark's code should be very similar.

+1


source







All Articles