Why is my Kafka consumer quickly consuming messages on first launch, but slows down significantly in future runs?

I am a student exploring and playing with Kafka. After following the Apache documentation examples, I am playing around with the examples in my current Github repository tube.

As now, the example implements the "older" version of them Consumer

and does not use the new one KafkaConsumer

. Following the documentation, I wrote my own version KafkaConsumer

thinking it would be faster.

This is an unspecified question, but in passing I am throwing 5000 simple messages like "Message_CurrentMessageNumber" into the topic "test" and then using my consumer to fetch those messages and print them to stdout

. When I run the example code replacing the provided consumer with a new one KafkaConsumer

(v 0.8.2 and up), it runs fairly fast and is comparable to the example in its first pass, but slows down significantly any time thereafter.

I notice that my Kafka Server

exits

Rebalancing group1 generation 3 (kafka.coordinator.ConsumerCoordinator)

or posts like this often lead me to believe that Kafka must be doing some sort of load balancing that slows things down, but I was wondering if anyone else has any idea what I am doing wrong.

public class AlternateConsumer extends Thread {

    private final KafkaConsumer<Integer, String> consumer;
    private final String topic;
    private final Boolean isAsync = false;

     public AlternateConsumer(String topic) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("group.id", "newestGroup");
        properties.put("partition.assignment.strategy", "roundrobin");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumer = new KafkaConsumer<Integer, String>(properties);
        consumer.subscribe(topic);
        this.topic = topic;
    }

    public void run() {
        while (true) {
            ConsumerRecords<Integer, String> records = consumer.poll(100);
            for (ConsumerRecord<Integer, String> record : records) {
                System.out.println("We received message: " + record.value() + " from topic: " + record.topic());
            }
        }

        // ConsumerRecords<Integer, String> records = consumer.poll(0);
        // for (ConsumerRecord<Integer, String> record : records) {
        //  System.out.println("We received message: " + record.value() + " from topic: " + record.topic());
        // }
        // consumer.close();
    }
} 

      

To get started:

package kafka.examples;

public class KafkaConsumerProducerDemo implements KafkaProperties
{
  public static void main(String[] args) {
    final boolean isAsync = args.length > 0 ? !args[0].trim().toLowerCase().equals("sync") : true;

    Producer producerThread = new Producer("test", isAsync);
    producerThread.start();

    AlternateConsumer consumerThread = new AlternateConsumer("test");
    consumerThread.start();
  } 
}

      

Manufacturer - The default manufacturer located here: https://github.com/apache/kafka/blob/trunk/examples/src/main/java/kafka/examples/Producer.java

+4


source to share


1 answer


It doesn't have to be. If the setup is similar between your two consumers, you should expect a better result with the new consumer, unless there is a problem in the client / consumer implementation that seems to be the case here.



Can you share your test results and the frequency of reported rebalancing and / or any pattern (i.e. sluggish once at startup, after consuming a fixed message, after the queue is empty, etc.) that you observe. Also if you can share some details about your consumer implementation.

+1


source







All Articles