Why is Kafka KTable missing entries?

I have one Java application that uses KTable from Kafka threads. Until recently, I was able to fetch all the data using a KTable when suddenly some of the messages seemed to disappear. There should be ~ 33k messages with unique keys.

When I want to receive messages by key, I do not receive some messages. I am using ReadOnlyKeyValueStore to receive messages:

final ReadOnlyKeyValueStore<GenericRecord, GenericRecord> store = ((KafkaStreams)streams).store(storeName, QueryableStoreTypes.keyValueStore());
store.get(key);

      

These are the configuration settings I have set for KafkaStreams.

final Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, serverId);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

      

Kafka : 0.10.2.0-cp1
Conflicts : 3.2.0

Research has led me to very disturbing beliefs. Using REST Proxy, I manually read the sections and found that some offsets are returning an error.

Request: /topics/{topic}/partitions/{partition}/messages?offset={offset}

{
    "error_code": 50002,
    "message": "Kafka error: Fetch response contains an error code: 1"
}

      

No client, no java, no command line, however, return no errors. They just miss missing messageswhich results in no data in KTables. Everything was fine and without warning it seemed that somehow some of the messages were corrupted.

I have two brokers and all threads have a replication factor of 2 and are fully replicated. Both brokers separately return the same. Restarting brokers doesn't matter.

  • What could be the reason?
  • How to detect this case in the client?
+3


source to share


1 answer


By default, the Kafka Broker config key is cleanup.policy

set to delete

. Set compact

to keep the last message for each key. See seal .

Deleting old messages does not change the minimum offset, so trying to get the message below it throws an error. The error is very vague. The Kafka Streams client will start reading messages at the minimum offset to avoid errors. The only visible effect is the lack of data in KTables.



As long as the application is running, thanks to caches, all data can be accessed even after messages have been deleted from Kafka itself. They will disappear after cleaning.

+1


source







All Articles