Is there a way to get the offset for each message consumed in the kafka streams?

To avoid reading messages that have been processed but skipped, in order to get a message when KAFKA STREAMS is killed, I want to get the offset for each message along with the key and value so that I can store it somewhere and use it to avoid processing already processed messages ...

+3


source to share


1 answer


Yes it is possible. See the FAQ section at http://docs.confluent.io/current/streams/faq.html#accessing-record-metadata-such-as-topic-partition-and-offset-information .

I will copy the following key information:



Access to post metadata such as type, section, and estimate information?

Record metadata is available through the processor API . It is also available indirectly via DSL thanks to its Processor API Integration .

With the Processor API, you can access post metadata via ProcessorContext

. You can store the context reference in your cpu instance field at time Processor#init()

, and then query the cpu context in Processor#process()

, for example (same for Transformer

). The context is automatically updated to match the record that is currently being processed, which means that methods such as ProcessorContext#partition()

, always return the current record metadata. Some caveats in context apply when invoking a processor punctuate()

, see the Javadocs for details.

If you are using a DSL in conjunction with a custom one Transformer

, for example, you can transform the value of the input records to also include the metadata section and offset, as well as subsequent DSL operations such as map

or filter

might use this information.

+2


source







All Articles