Spark streaming 24X7 with updateStateByKey issue

I am running sparking for 24/7 and using updateStateByKey Is it possible to start spark flow 24/7? If Yes is not, then updateStateByKey will become large, how to deal with it? Do we need to reset / delete updateStateByKey periodically when we run 24/7, if not, how and when to reset it? Or does Spark handle in a distributed fashion? how to dynamically grow memory / storage.

I am getting the following errors when updateStateByKey Grows

Array out of bound exception

Exception while deleting local spark dir: /var/folders/3j/9hjkw0890sx_qg9yvzlvg64cf5626b/T/spark-local-20141026101251-cfb4
java.io.IOException: Failed to delete: /var/folders/3j/9hjkw0890sx_qg9yvzlvg64cf5626b/T/spark-local-20141026101251-cfb4

      

How to deal with this .. Please point me if there are any documents? i am totally stuck, any help is greatly appreciated .. thanks for your time

+3


source to share


2 answers


Use Option.absent () in Java and None in Scala to remove keys. A working example can be found at http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/ .



+5


source


Update the key with None to remove it from the spark. If you want to keep the key for a certain amount of time, you can attach an expiration time to it and check it every batch.

For example, here's the code to count records by minute:



val counts = lines.map(line => (currentMinute, 1))
val countsWithState = counts updateStateByKey { (values: Seq[Int], state: Option[Int]) =>
  if (values.isEmpty) { // every key will be iterated, even if there no record in this batch
    println("values is empty")
    None // this will remove the key from spark
  } else {
    println("values size " + values.size)
    Some(state.sum + values.sum)
  }
}

      

0


source







All Articles