Spark Streaming and ElasticSearch - Failed to write all records
I am currently writing a Scala application made from Producer and Consumer. Producers get some data from an external source and write it inside Kafka. The consumer reads from Kafka and writes to Elasticsearch.
The consumer is based on Spark Streaming and every 5 seconds fetches new messages from Kafka and writes them to ElasticSearch. The problem is I cannot write in ES because I am getting a lot of errors like below:
ERROR] [2015-04-24 11: 21: 14,734] [org.apache.spark.TaskContextImpl]: Error in TaskCompletionListener org.elasticsearch.hadoop.EsHadoopException: Failed to write all records [3/26560] (possibly ES was overloaded?). Release ... at org.elasticsearch.hadoop.rest.RestRepository.flush (RestRepository.java:225) ~ [elasticsearch-spark_2.10-2.1.0.Beta3.jar: 2.1.0.Beta3] at org.elasticsearch. hadoop.rest.RestRepository.close (RestRepository.java:236) ~ [elasticsearch-spark_2.10-2.1.0.Beta3.jar: 2.1.0.Beta3] at org.elasticsearch.hadoop.rest.RestService $ PartitionWriter.close (RestService.java:125) ~ [elasticsearch-spark_2.10-2.1.0.Beta3.jar: 2.1.0.Beta3] at org.elasticsearch.spark.rdd.EsRDDWriter $$ anonfun $ write $ 1.Apply $$ ΞV SP (EsRDDWriter.scala: 33) ~ [elasticsearch-spark_2.10-2.1.0.Beta3.jar: 2.1.0.Beta3] at org.apache.spark.TaskContextImpl $$ Anon $ 2.onTaskCompletion (TaskContextImpl.scala: 57) ~ [spark-core_2.10-1.2.1.jar: 1.2.1] at org.apache.spark.TaskContextImpl $$ anonfun $ markTaskCompleted $ 1.Apply (TaskContextImpl .scala: 68) [spark-core_2.10-1.2.1.jar: 1.2.1] at org.apache.spark.TaskContextImpl $$ anonfun $ markTaskCompleted $ 1.Apply (TaskContextImpl.scala: 66) [spark-core_2. 10-1.2.1.jar: 1.2.1] at scala.collection.mutable.ResizableArray $ class.foreach (ResizableArray.scala: 59) [na: na] at scala.collection.mutable.ArrayBuffer.foreach (ArrayBuffer.scala : 47) [na: na] at org.apache.spark.TaskContextImpl.markTaskCompleted (TaskContextImpl.scala: 66) [spark-core_2.10-1.2.1.jar: 1.2.1] at org.apache.spark.scheduler .Task.run (Task.scala: 58) [spark-core_2.10-1.2.1.jar: 1.2.1] at org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 200) [ spark-core_2.10-1.2.1.jar: 1.2.1] at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1145) [na: 1.7.0_65] at java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:615) [ na: 1.7.0_65] in java.lang.Thread.run (Thread.java:745) [na: 1.7.0_65]
Note that the producer writes 6 messages every 15 seconds, so I really don't understand how this "overload" could happen (I even cleared the topic and blushed all the old messages, I thought it was due to an offset issue). The task that Spark Streaming runs every 5 seconds can be summarized with the following code:
val result = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Map("wasp.raw" -> 1), StorageLevel.MEMORY_ONLY_SER_2)
val convertedResult = result.map(k => (k._1 ,AvroToJsonUtil.avroToJson(k._2)))
//TO-DO : Remove resource (yahoo/yahoo) hardcoded parameter
log.info(s"*** EXECUTING SPARK STREAMING TASK + ${java.lang.System.currentTimeMillis()}***")
convertedResult.foreachRDD(rdd => {
rdd.map(data => data._2).saveToEs("yahoo/yahoo", Map("es.input.json" -> "true"))
})
If I try to print the messages instead of sending to ES, everything is fine and in fact I only see 6 messages. Why can't I write in ES?
For the sake of completeness, I'm using this library to write to ES: elasticsearch-spark_2.10 with the latest beta.
source to share
After many attempts, I found a way to write in ElasticSearch without any errors. Basically passing a parameter "es.batch.size.entries" -> "1"
to the saveToES method solved the problem. I don't understand why using the default or any other batch size results in the above error, given that I expect an error message if I try to write more stuff than the maximum batch size allowed, no less.
Also, I noticed that I was actually writing in ES, but not for all my posts, I was losing 1 to 3 posts per batch.
source to share
When I pushed the dataframe to ES on Spark I got the same error message. Even with configuration "es.batch.size.entries" -> "1"
, I had the same error. Once I increased my thread pool in ES, I could figure out this problem.
eg,
Bulk pool
threadpool.bulk.type: fixed
threadpool.bulk.size: 600
threadpool.bulk.queue_size: 30000
source to share