Is there a way to get the offset for each message consumed in the kafka streams?
To avoid reading messages that have been processed but skipped, in order to get a message when KAFKA STREAMS is killed, I want to get the offset for each message along with the key and value so that I can store it somewhere and use it to avoid processing already processed messages ...
Yes it is possible. See the FAQ section at http://docs.confluent.io/current/streams/faq.html#accessing-record-metadata-such-as-topic-partition-and-offset-information .
I will copy the following key information:
Access to post metadata such as type, section, and estimate information?
Record metadata is available through the processor API . It is also available indirectly via DSL thanks to its Processor API Integration .
With the Processor API, you can access post metadata via
ProcessorContext
. You can store the context reference in your cpu instance field at timeProcessor#init()
, and then query the cpu context inProcessor#process()
, for example (same forTransformer
). The context is automatically updated to match the record that is currently being processed, which means that methods such asProcessorContext#partition()
, always return the current record metadata. Some caveats in context apply when invoking a processorpunctuate()
, see the Javadocs for details.If you are using a DSL in conjunction with a custom one
Transformer
, for example, you can transform the value of the input records to also include the metadata section and offset, as well as subsequent DSL operations such asmap
orfilter
might use this information.