Kafka broker memory leak caused by many consumers

I am creating a Java 8 application that asks for a Kafka topic for exactly one post. Each request creates a new object Consumer

(regardless of any existing objects Consumer

) that polls my Kafka theme, gets one entry, and is Consumer

closed. This happens ~ 200k times a day and each request is independent of everyone else, so I don't think I can reuse consumers. Basically, a user requests a post from a topic and a consumer is created for them, then closed. This happens on average ~ 2 times per second, but randomly, so it can happen 10 times / s or 1 time per hour, there is no way to know.

After a while, the heap size on the Kafka server (not the server running the code, but the actual server running Kafka) becomes huge and garbage collection cannot clean it up. After all, there is more CPU time devoted to the GC than anything else, and everything works until I restart Kafka.

Here's an example version of the code causing the problem, with while(true)

close-to-real behavior (in production, consumers are not created in a while loop, but are created on demand when a user requests a message from a topic):

Properties props = new Properties();
props.put("bootstrap.servers", "SERVER_IP:9092");
props.put("session.timeout.ms", 30000);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", 1000);

while(true){
    Consumer<String, String> consumer = new KafkaConsumer<>(props);
    TopicPartition tp = new TopicPartition("TOPIC", 0);
    consumer.assign(Arrays.asList(tp));
    consumer.seekToEnd(Arrays.asList(tp));

    // I've narrowed down the memory leak to this line
    ConsumerRecords<String, String> cr = consumer.poll(1000); 
    // If I remove this line ^, the memory leak does not happen

    /* CODE TO GET ONE RECORD */

    consumer.unsubscribe();
    consumer.close();
}

      

Running this code on 20 JVMs results in a memory leak after about 20 minutes. Here's what the heap (blue) and GC pause time (green) looks like on the Kafka server: KafkaMemoryLeak

Am I doing something wrong (or is there a better way to approach this) or is this a bug in Kafka where many consumers are created and closed?

I am running Kafka 0.10.2.1 on the client side and Kafka 0.10.2.0 on the server.

+3


source to share


2 answers


Regardless of the number and frequency of requests you receive, you can reuse KafkaConsumer instances. You can only poll when a request comes in, but you don't have to create and close a user every time.



Having said that, your use of consumers might reveal a memory management issue on the broker if memory usage increases and the GC is not fixed. I have seen issues reporting that the broker is running out of direct memory when the producers are overworked very often. So there is probably room for improvement there. It is probably best to bump the ticket to issues.apache.org to make it look.

+1


source


You poll Kafka ~ 200 thousand times a day, i.e. ~ 8k times per hour / ~ 140 times per minute / ~ twice per second - why are you creating (and closing) a new consumer instance every time? Just schedule KafkaConsumer

to run at your desired interval (you can use the JDK ScheduledExecutorService

for that) and reuse the same user instance



0


source







All Articles