Spark Streaming and ElasticSearch - Failed to write all records

I am currently writing a Scala application made from Producer and Consumer. Producers get some data from an external source and write it inside Kafka. The consumer reads from Kafka and writes to Elasticsearch.

The consumer is based on Spark Streaming and every 5 seconds fetches new messages from Kafka and writes them to ElasticSearch. The problem is I cannot write in ES because I am getting a lot of errors like below:

ERROR] [2015-04-24 11: 21: 14,734] [org.apache.spark.TaskContextImpl]: Error in TaskCompletionListener org.elasticsearch.hadoop.EsHadoopException: Failed to write all records [3/26560] (possibly ES was overloaded?). Release ... at org.elasticsearch.hadoop.rest.RestRepository.flush (RestRepository.java:225) ~ [elasticsearch-spark_2.10-2.1.0.Beta3.jar: 2.1.0.Beta3] at org.elasticsearch. hadoop.rest.RestRepository.close (RestRepository.java:236) ~ [elasticsearch-spark_2.10-2.1.0.Beta3.jar: 2.1.0.Beta3] at org.elasticsearch.hadoop.rest.RestService $ PartitionWriter.close (RestService.java:125) ~ [elasticsearch-spark_2.10-2.1.0.Beta3.jar: 2.1.0.Beta3] at org.elasticsearch.spark.rdd.EsRDDWriter $$ anonfun $ write $ 1.Apply $$ ΞV SP (EsRDDWriter.scala: 33) ~ [elasticsearch-spark_2.10-2.1.0.Beta3.jar: 2.1.0.Beta3] at org.apache.spark.TaskContextImpl $$ Anon $ 2.onTaskCompletion (TaskContextImpl.scala: 57) ~ [spark-core_2.10-1.2.1.jar: 1.2.1] at org.apache.spark.TaskContextImpl $$ anonfun $ markTaskCompleted $ 1.Apply (TaskContextImpl .scala: 68) [spark-core_2.10-1.2.1.jar: 1.2.1] at org.apache.spark.TaskContextImpl $$ anonfun $ markTaskCompleted $ 1.Apply (TaskContextImpl.scala: 66) [spark-core_2. 10-1.2.1.jar: 1.2.1] at scala.collection.mutable.ResizableArray $ class.foreach (ResizableArray.scala: 59) [na: na] at scala.collection.mutable.ArrayBuffer.foreach (ArrayBuffer.scala : 47) [na: na] at org.apache.spark.TaskContextImpl.markTaskCompleted (TaskContextImpl.scala: 66) [spark-core_2.10-1.2.1.jar: 1.2.1] at org.apache.spark.scheduler .Task.run (Task.scala: 58) [spark-core_2.10-1.2.1.jar: 1.2.1] at org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 200) [ spark-core_2.10-1.2.1.jar: 1.2.1] at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1145) [na: 1.7.0_65] at java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:615) [ na: 1.7.0_65] in java.lang.Thread.run (Thread.java:745) [na: 1.7.0_65]

Note that the producer writes 6 messages every 15 seconds, so I really don't understand how this "overload" could happen (I even cleared the topic and blushed all the old messages, I thought it was due to an offset issue). The task that Spark Streaming runs every 5 seconds can be summarized with the following code:

  val result = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Map("wasp.raw" -> 1), StorageLevel.MEMORY_ONLY_SER_2)
  val convertedResult = result.map(k => (k._1 ,AvroToJsonUtil.avroToJson(k._2)))



  //TO-DO : Remove resource (yahoo/yahoo) hardcoded parameter
  log.info(s"*** EXECUTING SPARK STREAMING TASK  + ${java.lang.System.currentTimeMillis()}***")


  convertedResult.foreachRDD(rdd => {
      rdd.map(data => data._2).saveToEs("yahoo/yahoo", Map("es.input.json" -> "true"))

  })

      

If I try to print the messages instead of sending to ES, everything is fine and in fact I only see 6 messages. Why can't I write in ES?

For the sake of completeness, I'm using this library to write to ES: elasticsearch-spark_2.10 with the latest beta.

+3


source to share


3 answers


After many attempts, I found a way to write in ElasticSearch without any errors. Basically passing a parameter "es.batch.size.entries" -> "1"

to the saveToES method solved the problem. I don't understand why using the default or any other batch size results in the above error, given that I expect an error message if I try to write more stuff than the maximum batch size allowed, no less.



Also, I noticed that I was actually writing in ES, but not for all my posts, I was losing 1 to 3 posts per batch.

+3


source


When I pushed the dataframe to ES on Spark I got the same error message. Even with configuration "es.batch.size.entries" -> "1"

, I had the same error. Once I increased my thread pool in ES, I could figure out this problem.

eg,



Bulk pool

threadpool.bulk.type: fixed
threadpool.bulk.size: 600
threadpool.bulk.queue_size: 30000

      

+2


source


One possibility is the cluster / shard status being RED. Please refer to this issue which may be caused by unassigned replicas. After the status became GREEN, the API call terminated successfully.

+1


source







All Articles