Consumer does not receive message in Apache Kafka

I am creating an Apache Kafka consumer to subscribe to another Kafka already running. Now my problem is when my producer outputs messages to the server ... my consumer doesn't receive them. Here I give the manufacturer code

        Properties properties = new Properties();
        properties.put("metadata.broker.list","Running kafka ip addr:9092");
        properties.put("serializer.class","kafka.serializer.StringEncoder");
        ProducerConfig producerConfig = new ProducerConfig(properties);
        kafka.javaapi.producer.Producer<String,String> producer = new kafka.javaapi.producer.Producer<String, String>(producerConfig);
        String filePath="filepath";
        File rootFile= new File(filePath);
        Collection<File> allFiles = FileUtils.listFiles(rootFile, CanReadFileFilter.CAN_READ, TrueFileFilter.INSTANCE);
        for(File file : allFiles) {
            StringBuilder sb = new StringBuilder();
            sb.append(file);
            KeyedMessage<String, String> message =new KeyedMessage<String, String>(TOPIC,sb.toString());
            System.out.println("sending msg from producer.."+sb.toString());
            producer.send(message);
        }
           producer.close();

      

Here's the consumer code

         properties.put("bootstrap.servers","Running zookeaper ip addr:2181");
         properties.put("group.id","test-group");
         properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         properties.put("enable.auto.commit", "false");

         KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
         consumer.subscribe(Collections.singletonList(topicName)); 
         while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records)
                {
                    System.out.println("topic = "+record.topic());
                    System.out.println("topic = "+record.partition());
                    System.out.println("topic = "+record.offset());
                }
                try {
                  consumer.commitSync(); 
                } catch (CommitFailedException e) {
                    System.out.printf("commit failed", e) ;
                }
            }

      

I am using this dependency:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.10.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.1.0</version>
    </dependency>

      

I get all the information from this link:
https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

When we work with a consumer, we have not received any notifications from the consumer. Please give me any idea.

+3


source to share


2 answers


For the manufacturer:

   properties.put("metadata.broker.list","Running kafka ip addr:9092");

      

I think it should be "bootstrap.servers".

For the consumer:



properties.put("bootstrap.servers","Running zookeaper ip addr:2181");

      

bootstrap.servers

should point to broker, not ZK.

The "problem" is that the consumer will just wait for the broker, but will fail if there is no broker on the specified host / port.

0


source


I am new to Kafka and Java but I would like to suggest the following approach



  • Make sure the manufacturer is actually writing this thread using the following command /usr/bin/kafka-avro-console-consumer --new-consumer --bootstrap-server localhost:9092 --topic KumarTopic --from-beginning

    .
  • If so, you will probably need to focus on your consumer code. Confluent guides are very helpful.
0


source







All Articles