How to periodically update rdd in a spark stream

My code looks something like this:

sc = SparkContext()
ssc = StreamingContext(sc, 30)

initRDD = sc.parallelize('path_to_data')
lines = ssc.socketTextStream('localhost', 9999)
res = lines.transform(lambda x: x.join(initRDD))

res.pprint()

      

And my question is what initRDD

needs to be updated every day at midnight .

I am doing my best:

sc = SparkContext()
ssc = StreamingContext(sc, 30)

lines = ssc.socketTextStream('localhost', 9999)


def func(rdd):
    initRDD = rdd.context.parallelize('path_to_data')
    return rdd.join(initRDD)


res = lines.transform(func)

res.pprint()

      

But it seems like it initRDD

will update in 30 seconds, which when usingbatchDuration

Is there a good ideal

+3


source to share


1 answer


One option is to check the due date transform

. The check is a simple comparison and therefore cheap for every packet interval:



def nextDeadline() : Long = {
  // assumes midnight on UTC timezone.
  LocalDate.now.atStartOfDay().plusDays(1).toInstant(ZoneOffset.UTC).toEpochMilli()
}
// Note this is a mutable variable!
var initRDD = sparkSession.read.parquet("/tmp/learningsparkstreaming/sensor-records.parquet")
// Note this is a mutable variable!
var _nextDeadline = nextDeadline()

val lines = ssc.socketTextStream("localhost", 9999)
// we use the foreachRDD as a scheduling trigger. 
// We don't use the data, only the execution hook
lines.foreachRDD{ _ => 
    if (System.currentTimeMillis > _nextDeadline) {
      initRDD = sparkSession.read.parquet("/tmp/learningsparkstreaming/sensor-records.parquet")
      _nextDeadline = nextDeadline()
    }
}
// if the rdd was updated, it will be picked up in this stage.
val res = lines.transform(rdd => rdd.join(initRDD))

      

+2


source







All Articles