Spark: How do I pass a PartialFunction to a DStream?
I am trying to pass a partial function to concatenate all RDDs captured in a DStream packet in a sliding window. Let's say I plotted a window operation for 10 seconds on a stream sampled by 1 second:
val ssc = new StreamingContext(new SparkConf(), Seconds(1))
val stream = ssc.socketStream(...)
val window = stream.window(Seconds(10))
My window
will have a lot of RDDs. I want to use collect(f: PartialFunction[T, U])
to combine all K of these RDDs. I could call the concatenation operator ++
with foreachRDD
, but I want to return RDD
not a Unit
and avoid side effects.
What I'm looking for is a reducer like
def reduce(f: (RDD[T], RDD[T]) ⇒ RDD[T]): RDD[T]
on a DStream
which I can use like so:
window.reduce(_ ++ _).transform(_.collect(myPartialFunc))
But this is not available in the Spark Streaming API.
Does anyone have any good ideas for combining RDDs written in a stream into one RDD so I can pass a partial function? Or to implement my own RDD reducer? Perhaps this feature is coming in the next release of Spark?
source to share
Partial functions are not directly supported by the DStream operation, but it is not difficult to achieve the same functionality.
For example, take a trivial partial function that takes a String a, creates an Int to a String if that is a number:
val pf:PartialFunction[String,Int] = {case x if (Try(x.toInt).isSuccess) => x.toInt}
And we have a stream of lines:
val stringDStream:DStream[String] = ??? // use your stream source here
Then we can apply the partial function to the DStream like this:
val intDStream = stringDStream.filter(x => pf.isDefinedAt(x)).map(pf)
source to share