Spark: How do I pass a PartialFunction to a DStream?

I am trying to pass a partial function to concatenate all RDDs captured in a DStream packet in a sliding window. Let's say I plotted a window operation for 10 seconds on a stream sampled by 1 second:

val ssc = new StreamingContext(new SparkConf(), Seconds(1))
val stream = ssc.socketStream(...)
val window = stream.window(Seconds(10))

      

My window

will have a lot of RDDs. I want to use collect(f: PartialFunction[T, U])

to combine all K of these RDDs. I could call the concatenation operator ++

with foreachRDD

, but I want to return RDD

not a Unit

and avoid side effects.

What I'm looking for is a reducer like

def reduce(f: (RDD[T], RDD[T]) ⇒ RDD[T]): RDD[T]

      

on a DStream

which I can use like so:

window.reduce(_ ++ _).transform(_.collect(myPartialFunc))

      

But this is not available in the Spark Streaming API.

Does anyone have any good ideas for combining RDDs written in a stream into one RDD so I can pass a partial function? Or to implement my own RDD reducer? Perhaps this feature is coming in the next release of Spark?

+3


source to share


1 answer


Partial functions are not directly supported by the DStream operation, but it is not difficult to achieve the same functionality.

For example, take a trivial partial function that takes a String a, creates an Int to a String if that is a number:

val pf:PartialFunction[String,Int] = {case x if (Try(x.toInt).isSuccess) => x.toInt}

      

And we have a stream of lines:



val stringDStream:DStream[String] = ??? // use your stream source here

      

Then we can apply the partial function to the DStream like this:

val intDStream = stringDStream.filter(x => pf.isDefinedAt(x)).map(pf)

      

+2


source







All Articles