Message lost in Kafka + Spark Streaming

I faced a problem related to sparking with kafka, my use case is as follows: 1. Spark streaming (DirectStream) application that reads data / messages from the kafka topic and processes them 2. Based on the processed message, the application will write the processed message on different topics kafka for example. if the message is harmonized, then write to the agreed topic, as well as the non-harmonized topic

the problem is that while streaming we are losing some messages and all incoming messages are not being recorded in harmonized or unarmoured threads. eg. if the application received 30 messages in one batch, then sometimes it writes all messages to the output threads (this is the expected behavior), but sometimes it only writes 27 messages (3 messages are lost, this number may change).

Versions as follows:

Spark 1.6.0

Kafka 0.9

The configuration of Kafka themes looks like this:

number of brokers: 3

replication count: 3

number of sections: 3

Below are the properties we use for kafka:

      val props = new Properties() 
      props.put("metadata.broker.list", properties.getProperty("metadataBrokerList")) 
      props.put("auto.offset.reset", properties.getProperty("autoOffsetReset")) 
      props.put("group.id", properties.getProperty("group.id")) 
      props.put("serializer.class", "kafka.serializer.StringEncoder") 
      props.put("outTopicHarmonized", properties.getProperty("outletKafkaTopicHarmonized")) 
      props.put("outTopicUnharmonized", properties.getProperty("outletKafkaTopicUnharmonized")) 
      props.put("acks", "all"); 
      props.put("retries", "5"); 
      props.put("request.required.acks", "-1") 

      

Below is the code snippet where we write the processed kafka messages: val schemaRdd2 = finalHarmonizedDF.toJSON

      schemaRdd2.foreachPartition { partition => 
        val producerConfig = new ProducerConfig(props) 
        val producer = new Producer[String, String](producerConfig) 

        partition.foreach { row => 
          if (debug) println(row.mkString) 
          val keyedMessage = new KeyedMessage[String, String](props.getProperty("outTopicHarmonized"), 
            null, row.toString()) 
          producer.send(keyedMessage) 

        } 
        //hack, should be done with the flush 
        Thread.sleep(1000) 
        producer.close() 
      } 

      

We have specially added sleep (1000) for testing purposes. But that also doesn't solve the problem :(

Any suggestion would be appreciated.

+3


source to share





All Articles