SendOffsetsToTransaction value in Kafka 0.11

The new version of Kafka (0.11) supports exactly one semantics.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging

I have a producer setup with kafka transactional code in java like this.

producer.initTransactions();
    try {
        producer.beginTransaction();
        for (ProducerRecord<String, String> record : payload) {
            producer.send(record);
        }

        Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<TopicPartition, OffsetAndMetadata>() {
            {
                put(new TopicPartition(TOPIC, 0), new OffsetAndMetadata(42L, null));
            }
        };
        producer.sendOffsetsToTransaction(groupCommit, "groupId");
        producer.commitTransaction();
    } catch (ProducerFencedException e) {
        producer.close();
    } catch (KafkaException e) {
        producer.abortTransaction();
    }

      

I'm not really sure how to use sendOffsetsToTransaction and the intended use case. AFAIK, consumer groups are a multi-threaded consumer side read function.

javadoc says

"Sends a list of consumed offsets to the consumer group coordinator, and also marks these offsets as part of the current transaction. These offsets are considered consumed only if the transaction is successful. This method should be used when you need to batch and create messages together, like typically in a consumption-transform pattern.

How to create a list of written off offsets? What's the point?

+3


source to share


1 answer


This only applies to workflows where you consume and then create messages based on that consumption. In such cases, this feature allows you to make offsets only if the producer transaction is successful.

Without transactions, you use Consumer#commitSync()

or Consumer#commitAsync()

. But if you use these consumer techniques to commit offsets, before using the data you are consuming from the producer, you will commit offsets before you know if the producer succeeded in submitting their records. So, instead of doing your offsets with the consumer, you can use Producer#sendOffsetsToTransaction()

for the downstream producer. This sends offsets to the transaction manager processing the transaction. It will only perform offsets if the entire transaction succeeds.



( Note : when you send commit offsets, you must add 1 to the last read offset to read the summary from an offset you haven't read in the future. This is true whether you commit with a consumer or producer. See: KafkaProducer sendOffsetsToTransaction requires an offset + 1 to successfully commit the current offset ).

0


source







All Articles