Kafka consumer fetch API does not return right offset value

I created an experimental Kafka environment with 3 brokers and a theme with 3 partitions. I have a producer and a consumer. I want to change the offsets of a section for a specific user. I read in the kafka documentation that the custom commit / fetch API mechanism in kafka can commit a specific offset or get the last offset read by the consumer. here is the link for the API:

https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI

I used the code from the page below to write my code to fetch offsets from a specific consumer. However, the fetch API returns -1 for the requested offset. here is some sample code:
https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka

I also read in the first link that "if there is no offset associated with the topic section in this consumer group, the broker does not set an error code (as this is not really an error), but returns empty metadata and sets the offset field to -1."

However, I have released multiple messages and my consumer is consuming messages and outputting an offset for each message read.

I would be very grateful if anyone can help with this. I want to know which part of my code is wrong. Or maybe there is something wrong with the API. Please feel free to make any helpful comments. My code is exactly the same as in the link provided. However, if you need to see my code, please tell me to get it here.

Kafka version - 0.10.2.0

My Kafka config:

Broker 1: port 9093

Broker 2: port 9094

Broker 3: port 9095

Subject: "testpic3"

......................

User configuration:

props.put("group.id", "test");

props.put("client.id", "MyConsumer");

      

................

here is my code:

public class KafkaOffsetManage {

public static void main(String[] args) {


    BlockingChannel channel = new BlockingChannel("localhost", 9095,
            BlockingChannel.UseDefaultBufferSize(),
            BlockingChannel.UseDefaultBufferSize(),
            5000 /* read timeout in millis */);
    channel.connect();
    final String MY_GROUP = "test";
    final String MY_CLIENTID = "MyConsumer";
    int correlationId = 0;
    final TopicAndPartition testPartition0 = new TopicAndPartition("testpic3",0);
    final TopicAndPartition testPartition1 = new TopicAndPartition("testpic3",1);
    final TopicAndPartition testPartition2 = new TopicAndPartition("testpic3",2);
    channel.send(new ConsumerMetadataRequest(MY_GROUP, ConsumerMetadataRequest.CurrentVersion(), correlationId++, MY_CLIENTID));
    ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse.readFrom(channel.receive().buffer());
    System.out.println("+++++++++++++++++++++++++++");

    System.out.println(metadataResponse.errorCode());

    if (metadataResponse.errorCode() == ErrorMapping.NoError()) {
        Broker offsetManager = metadataResponse.coordinator();
        // if the coordinator is different, from the above channel host then reconnect
        channel.disconnect();
        channel = new BlockingChannel(offsetManager.host(), offsetManager.port(),
                BlockingChannel.UseDefaultBufferSize(),
                BlockingChannel.UseDefaultBufferSize(),
                5000 /* read timeout in millis */);
        channel.connect();
        System.out.println("Connected to Offset Manager");
        System.out.println(offsetManager.host() + ",  Port:"+ offsetManager.port());

    } else {
        // retry (after backoff)
    }



    // How to fetch offsets


    List<TopicAndPartition> partitions = new ArrayList<TopicAndPartition>();
    partitions.add(testPartition0);
    //partitions.add(testPartition1);
    OffsetFetchRequest fetchRequest = new OffsetFetchRequest(
            MY_GROUP,
            partitions,
            (short) 2 /* version */, // version 1 and above fetch from Kafka, version 0 fetches from ZooKeeper
            correlationId,
            MY_CLIENTID);
    try {
        channel.send(fetchRequest.underlying());
        OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer());
        OffsetMetadataAndError result = fetchResponse.offsets().get(testPartition0);

        short offsetFetchErrorCode = result.error();
        if (offsetFetchErrorCode == ErrorMapping.NotCoordinatorForConsumerCode()) {
            channel.disconnect();
            // Go to step 1 and retry the offset fetch
        } else if (offsetFetchErrorCode  == ErrorMapping.OffsetsLoadInProgressCode()) {
            // retry the offset fetch (after backoff)
        } else {
            long retrievedOffset = result.offset();
            String retrievedMetadata = result.metadata();
            System.out.println("The retrieved offset is:"+ Long.toString(retrievedOffset));
            System.out.println(retrievedMetadata);
            System.out.println(result.toString());
        }
    }
    catch (Exception e) {
        channel.disconnect();
        // Go to step 1 and then retry offset fetch after backoff
    }
 }
}

      

The output of the code is here:

+++++++++++++++++++++++++++
0

Connected to Offset Manager

user-virtual-machine,  Port:9093
------------------------
The retrieved offset is:-1

OffsetMetadataAndError[-1,,3]

Process finished with exit code 0

      

One strange thing about Kafka's addictions. When I add this dependency, my code doesn't recognize some of the classes in the program:

<artifactId>kafka_2.10</artifactId> 
<version>0.10.2.0</version> 

      

the classes "ConsumerMetadataRequest" and "ConsumerMetadataResponse" are not recognized.

So, I added this dependency:

<artifactId>kafka_2.10</artifactId> 
<version>0.8.2.0</version>

      

Thank,

+3


source to share


1 answer


I assume that you added

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.10.2.0</version>
</dependency>

      

as your addiction. This is Kafka herself. What you need to consume / produce for Kafka 0.10.2:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.2.0</version>
</dependency>

      



To consume (and control the offset of a given consumer) use a class KafkaConsumer

, it has verbose javadoc and is more convenient than Fetching and fetching custom offsets in Kafka .

Also, if you still want to use the code from the example you gave, the problem you might have is:

List<TopicAndPartition> partitions = new ArrayList<TopicAndPartition>();
    partitions.add(testPartition0);

      

You only add one section, and chances are there are no posts on that section (you have 3 sections, so the posts you post may have gone through the other two). In Kafka, each section is separate, and the user group has different offsets for each section.

0


source







All Articles