Why is my Kafka consumer quickly consuming messages on first launch, but slows down significantly in future runs?
I am a student exploring and playing with Kafka. After following the Apache documentation examples, I am playing around with the examples in my current Github repository tube.
As now, the example implements the "older" version of them Consumer
and does not use the new one KafkaConsumer
. Following the documentation, I wrote my own version KafkaConsumer
thinking it would be faster.
This is an unspecified question, but in passing I am throwing 5000 simple messages like "Message_CurrentMessageNumber" into the topic "test" and then using my consumer to fetch those messages and print them to stdout
. When I run the example code replacing the provided consumer with a new one KafkaConsumer
(v 0.8.2 and up), it runs fairly fast and is comparable to the example in its first pass, but slows down significantly any time thereafter.
I notice that my Kafka Server
exits
Rebalancing group1 generation 3 (kafka.coordinator.ConsumerCoordinator)
or posts like this often lead me to believe that Kafka must be doing some sort of load balancing that slows things down, but I was wondering if anyone else has any idea what I am doing wrong.
public class AlternateConsumer extends Thread {
private final KafkaConsumer<Integer, String> consumer;
private final String topic;
private final Boolean isAsync = false;
public AlternateConsumer(String topic) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "newestGroup");
properties.put("partition.assignment.strategy", "roundrobin");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<Integer, String>(properties);
consumer.subscribe(topic);
this.topic = topic;
}
public void run() {
while (true) {
ConsumerRecords<Integer, String> records = consumer.poll(100);
for (ConsumerRecord<Integer, String> record : records) {
System.out.println("We received message: " + record.value() + " from topic: " + record.topic());
}
}
// ConsumerRecords<Integer, String> records = consumer.poll(0);
// for (ConsumerRecord<Integer, String> record : records) {
// System.out.println("We received message: " + record.value() + " from topic: " + record.topic());
// }
// consumer.close();
}
}
To get started:
package kafka.examples;
public class KafkaConsumerProducerDemo implements KafkaProperties
{
public static void main(String[] args) {
final boolean isAsync = args.length > 0 ? !args[0].trim().toLowerCase().equals("sync") : true;
Producer producerThread = new Producer("test", isAsync);
producerThread.start();
AlternateConsumer consumerThread = new AlternateConsumer("test");
consumerThread.start();
}
}
Manufacturer - The default manufacturer located here: https://github.com/apache/kafka/blob/trunk/examples/src/main/java/kafka/examples/Producer.java
source to share
It doesn't have to be. If the setup is similar between your two consumers, you should expect a better result with the new consumer, unless there is a problem in the client / consumer implementation that seems to be the case here.
Can you share your test results and the frequency of reported rebalancing and / or any pattern (i.e. sluggish once at startup, after consuming a fixed message, after the queue is empty, etc.) that you observe. Also if you can share some details about your consumer implementation.
source to share