Is it possible to update the value in real time with spark flow?

Suppose I have a stream of double values ​​and I want to calculate the average every ten seconds. How do I have a sliding window that doesn't need to be recalculated on average, but instead update it, say by removing some of the oldest ten seconds and adding only the new 10 seconds values?

+3


source to share


1 answer


TL; DR : use reduceByWindow

with both function arguments (skip to the last paragraph of the code snippet)

There are two interpretations of your question, specific (how to get the average over one hour, update every 2 seconds) and general (how to get a computation that updates the state in a sparse way). Here's the answer for a general one.

First, note that there is a way to present your data in such a way that your average updates update easily, based on the DStream window : this presents your data as an incremental stream build with maximum splitting. But it is less efficient, computationally, to recalculate the average for each batch - as you noted.

If you want to do an update to a complex stateful computation that is reversible but doesn't want to touch the flow construct, there is updateStateByKey

- but Spark won't help you reflect the incremental aspect of your computations in the flow, you have to manage it yourself.



Here you have something simple and reversible and you have no concept of keys. You can use reduceByWindow

reverse reduction with its argument using normal functions that allow you to compute the incremental average.

val myInitialDStream: DStream[Float]

val myDStreamWithCount: DStream[(Float, Long)] = 
  myInitialDStream.map((x) => (x, 1L))

def addOneBatchToMean(previousMean: (Float, Long), newBatch: (Float, Long)): (Float, Long) = 
  (previousMean._1 + newBatch._1, previousMean._2 + newBatch._2)

def removeOneBatchToMean(previousMean: (Float, Long), oldBatch: (Float, Long)): (Float, Long) = 
  (previousMean._1 - oldBatch._1, previousMean._2 - oldBatch._2)

val runningMeans = myDStreamWithCount.reduceByWindow(addOneBatchToMean, removeOneBatchToMean, Durations.seconds(3600), Duractions.seconds(2))

      

You get a stream of singleton RDD

s, each containing a pair (m, n), where m is your current sum in a 1h window and n is the number of elements in a 1h window. Just return (or map

in) m / n to get the average.

+1


source







All Articles