Spark streaming 24X7 with updateStateByKey issue
I am running sparking for 24/7 and using updateStateByKey Is it possible to start spark flow 24/7? If Yes is not, then updateStateByKey will become large, how to deal with it? Do we need to reset / delete updateStateByKey periodically when we run 24/7, if not, how and when to reset it? Or does Spark handle in a distributed fashion? how to dynamically grow memory / storage.
I am getting the following errors when updateStateByKey Grows
Array out of bound exception
Exception while deleting local spark dir: /var/folders/3j/9hjkw0890sx_qg9yvzlvg64cf5626b/T/spark-local-20141026101251-cfb4
java.io.IOException: Failed to delete: /var/folders/3j/9hjkw0890sx_qg9yvzlvg64cf5626b/T/spark-local-20141026101251-cfb4
How to deal with this .. Please point me if there are any documents? i am totally stuck, any help is greatly appreciated .. thanks for your time
source to share
Use Option.absent () in Java and None in Scala to remove keys. A working example can be found at http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/ .
source to share
Update the key with None to remove it from the spark. If you want to keep the key for a certain amount of time, you can attach an expiration time to it and check it every batch.
For example, here's the code to count records by minute:
val counts = lines.map(line => (currentMinute, 1))
val countsWithState = counts updateStateByKey { (values: Seq[Int], state: Option[Int]) =>
if (values.isEmpty) { // every key will be iterated, even if there no record in this batch
println("values is empty")
None // this will remove the key from spark
} else {
println("values size " + values.size)
Some(state.sum + values.sum)
}
}
source to share