How to periodically update rdd in a spark stream
My code looks something like this:
sc = SparkContext()
ssc = StreamingContext(sc, 30)
initRDD = sc.parallelize('path_to_data')
lines = ssc.socketTextStream('localhost', 9999)
res = lines.transform(lambda x: x.join(initRDD))
res.pprint()
And my question is what initRDD
needs to be updated every day at midnight .
I am doing my best:
sc = SparkContext()
ssc = StreamingContext(sc, 30)
lines = ssc.socketTextStream('localhost', 9999)
def func(rdd):
initRDD = rdd.context.parallelize('path_to_data')
return rdd.join(initRDD)
res = lines.transform(func)
res.pprint()
But it seems like it initRDD
will update in 30 seconds, which when usingbatchDuration
Is there a good ideal
+3
source to share
1 answer
One option is to check the due date transform
. The check is a simple comparison and therefore cheap for every packet interval:
def nextDeadline() : Long = { // assumes midnight on UTC timezone. LocalDate.now.atStartOfDay().plusDays(1).toInstant(ZoneOffset.UTC).toEpochMilli() } // Note this is a mutable variable! var initRDD = sparkSession.read.parquet("/tmp/learningsparkstreaming/sensor-records.parquet") // Note this is a mutable variable! var _nextDeadline = nextDeadline() val lines = ssc.socketTextStream("localhost", 9999) // we use the foreachRDD as a scheduling trigger. // We don't use the data, only the execution hook lines.foreachRDD{ _ => if (System.currentTimeMillis > _nextDeadline) { initRDD = sparkSession.read.parquet("/tmp/learningsparkstreaming/sensor-records.parquet") _nextDeadline = nextDeadline() } } // if the rdd was updated, it will be picked up in this stage. val res = lines.transform(rdd => rdd.join(initRDD))
+2
source to share