Consumer does not receive message in Apache Kafka
I am creating an Apache Kafka consumer to subscribe to another Kafka already running. Now my problem is when my producer outputs messages to the server ... my consumer doesn't receive them. Here I give the manufacturer code
Properties properties = new Properties();
properties.put("metadata.broker.list","Running kafka ip addr:9092");
properties.put("serializer.class","kafka.serializer.StringEncoder");
ProducerConfig producerConfig = new ProducerConfig(properties);
kafka.javaapi.producer.Producer<String,String> producer = new kafka.javaapi.producer.Producer<String, String>(producerConfig);
String filePath="filepath";
File rootFile= new File(filePath);
Collection<File> allFiles = FileUtils.listFiles(rootFile, CanReadFileFilter.CAN_READ, TrueFileFilter.INSTANCE);
for(File file : allFiles) {
StringBuilder sb = new StringBuilder();
sb.append(file);
KeyedMessage<String, String> message =new KeyedMessage<String, String>(TOPIC,sb.toString());
System.out.println("sending msg from producer.."+sb.toString());
producer.send(message);
}
producer.close();
Here's the consumer code
properties.put("bootstrap.servers","Running zookeaper ip addr:2181");
properties.put("group.id","test-group");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("enable.auto.commit", "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Collections.singletonList(topicName));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.println("topic = "+record.topic());
System.out.println("topic = "+record.partition());
System.out.println("topic = "+record.offset());
}
try {
consumer.commitSync();
} catch (CommitFailedException e) {
System.out.printf("commit failed", e) ;
}
}
I am using this dependency:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.0</version>
</dependency>
I get all the information from this link:
https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
When we work with a consumer, we have not received any notifications from the consumer. Please give me any idea.
source to share
For the manufacturer:
properties.put("metadata.broker.list","Running kafka ip addr:9092");
I think it should be "bootstrap.servers".
For the consumer:
properties.put("bootstrap.servers","Running zookeaper ip addr:2181");
bootstrap.servers
should point to broker, not ZK.
The "problem" is that the consumer will just wait for the broker, but will fail if there is no broker on the specified host / port.
source to share
I am new to Kafka and Java but I would like to suggest the following approach
- Make sure the manufacturer is actually writing this thread using the following command
/usr/bin/kafka-avro-console-consumer --new-consumer --bootstrap-server localhost:9092 --topic KumarTopic --from-beginning
. - If so, you will probably need to focus on your consumer code. Confluent guides are very helpful.
source to share