Kafka: java client failed to send messages after trying x

I am trying to send messages to Kafka from a Java application.

All I can get is "Failed to send messages after 2 attempts":

Exception in thread "main" kafka.common.FailedToSendMessageException: Failed to send messages after 2 tries.
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
    at kafka.producer.Producer.send(Producer.scala:77)
    at kafka.javaapi.producer.Producer.send(Producer.scala:33)
    at test.Main.main(Main.java:26)

      

Kafka is running on a remote machine, so I added it in server.properties

(let's say the Kafka server IP is 192.168.0.1):

host.name=192.168.0.1
advertised.host.name=192.168.0.1
advertised.port=9092

      

Starting Kafka kafka_2.11-0.8.2.1

, so I used (I think) the corresponding Java client version:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.11</artifactId>
  <version>0.8.2.1</version>
</dependency>

      

Java code:

public static void main(String[] args) {
  Properties properties = new Properties();
  properties.put("metadata.broker.list", "192.168.0.1:9092");
  properties.put("serializer.class", "test.StringEncoder");
  properties.put("key.serializer.class", "test.StringEncoder");
  properties.put("message.send.max.retries", "2");

  Producer<String, String> kafkaProducer = new Producer<String, String>(new ProducerConfig(properties));

  kafkaProducer.send(new KeyedMessage<String, String>(
      "LOG", 
      "Yo! " + new Date().toString()
  ));

  kafkaProducer.close();
}

      

The topic LOG

has already been created. I can post messages to Kafka from the same machine that is executing Java code using (and it works):

bin/kafka-console-producer.sh --broker-list 192.168.0.1:9092 --topic LOG

      

When Java code crashes, nothing is registered by Kafka and Zookeeper.

Is there a specific parameter that I have missed?

+3


source to share


2 answers


Apache kafka has a new producer client which is better:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.8.2.0</version>
</dependency>

      



Example: https://github.com/CameronGregory/kafka/blob/master/TestProducer.java

Your config appears to be fine. Is "test.StringEncoder" your custom class? try using "kafka.serializer.StringEncoder" instead

+2


source


Even I had the same problem because for me just restarting the Kafka server worked like a charm. You might want to try restarting it once.



+1


source







All Articles