KafkaProducer sendOffsetsToTransaction needs offset + 1 to successfully commit the current offset

I am trying to execute a transaction in Kafka Processor

to make sure I am not recycleing the same message twice. Given post (A), I need to create a list of posts to be generated on a different topic in a transaction, and I want to move the original post (A) into the same transaction. From the documentation I found a method Producer

sendOffsetsToTransaction

that seems to be able to commit an offset in a transaction only if it succeeds. This is the code inside the method of process()

mine Processor

:

    producer.beginTransaction()
    val topicPartition    = new TopicPartition(this.context().topic(), this.context().partition())
    val offsetAndMetadata = new OffsetAndMetadata(this.context().offset())
    val map               = Map(topicPartition -> offsetAndMetadata).asJava
    producer.sendOffsetsToTransaction(map, "consumer-group-id")
    items.foreach(x => producer.send(new ProducerRecord("items_topic", x.key, x.value)))
    producer.commitTransaction()
    throw new RuntimeException("expected exception")

      

Unfortunately, with this code (which obviously doesn't get executed on every execution) the message being processed (A) gets recycled every time I restart the application after an exception.

I managed to get it to work by adding +1

to the offset being returned this.context().offset()

and overriding val offsetAndMetadata

like this:

val offsetAndMetadata = new OffsetAndMetadata(this.context().offset() + 1)

      

Is this normal behavior or am I doing something wrong?

Thank:)

0


source to share


1 answer


Correct code.

The offsets you make are the offsets of the posts you want to read next (not the offsets of the posts you last read).



Compare: https://github.com/apache/kafka/blob/41e4e93b5ae8a7d221fce1733e050cb98ac9713c/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L346

+1


source







All Articles