How to get last offset Number directly or filter off offsets in NodeJS Kafka Consumer

I am using a simple consumer / producer in Kafka NodeJS. My producer sends messages that I easily receive from consumers. Manufacturer and consumer code below. In Consumer, I expect offset.fetch () to give me all the offset ids today, but it doesn't. KIndly tell me how I can get the results of this method, and also mention a method that directly gives the last offset number in the subject of any section. I also want to know how I can filter offsets in an upcoming streaming. eg: If I only want to receive the last 20 messages from my consumer?

My producer:

var kafka = require('kafka-node');
var Producer = kafka.Producer;
var Client = kafka.Client;
var client = new Client('localhost:2181');
var producer = new Producer(client);
producer.on('ready', function () {
    producer.send([
        { topic: 'test', key:'key1', partition: 0, messages: ['banana','carrot','lemon','apple','melon','kiwi','mango','avacado'], attributes: 0}
        ], function (err, result) {
        console.log(err || result);
        process.exit();
    });
});

      

My consumer:

var kafka = require('kafka-node');
var Consumer = kafka.Consumer;
var client = new kafka.Client('localhost:2181');
var offset = new kafka.Offset(client);
offset.fetch([
    { topic: 'test'  }
], function (err, data) {
    console.log(data);

});
var consumer = new Consumer(
        client,
        [
            { topic: 'test', partition: 0}
        ],

        {  autoCommit: false, autoCommitIntervalMs: 5000,  fetchMaxWaitMs: 100, fromOffset: true, fetchMinBytes: 1, fetchMaxBytes: 1024 * 10
        }
    );
consumer.on('message', function (message) {
       console.log(message);
});

      

+3


source to share


2 answers


I have also been in a situation where I need to receive a massage from the current time. I did this by fetching the offset value of the last post for which I passed the current time and the number of offset values ​​to offset.fetch ([]) . Then, after getting the offset value, I just passed it to the Consumer API and got the message from now on.

 offset.fetch([
    { topic: topicname, partition: partition, time: Date.now(), maxNum: 1 }
 ], function (err, data) {
    // data 
    // { 'topicname': { 'partition': [999] } } 
       var consumer = new Consumer(
            client,
                  [
                   { topic:topicname,
                     partition:partition,
                     offset:data[topicname][partition][0]}
                    ],
                    {
                        autoCommit: false,
                        fromOffset: true,
                    }
           );
           consumer.on('message', function (message) {
                  console.log(message);
           });
      });

      



this will give you messages from the current time. you can calculate the time 20 minutes earlier and can get the offset value of the message that happened 20 minutes before.

+4


source


As a follow up to @anand's answer. In the offset payload, you can change the time field to -1 or -2 instead of date for the following results.

// When time is -2, your response will be {topic: {partition: [lowestOffset]}
{ topic: topicname, partition: partition, time: -1, maxNum: 1 }

// When time is -1, your response will be {topic: {partition: [highestOffset, lowestOffset]}
{ topic: topicname, partition: partition, time: -2, maxNum: 1 }

      



"-1" was especially helpful to me, as I can simply establish that my offset is equal to the highest known offset for testing.

0


source







All Articles