Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
producer.beginTransaction();
for (ConsumerRecord record : records)
producer.send(producerRecord(“outputTopic”, record));
producer.sendOffsetsToTransaction(currentOffsets(consumer), group);
producer.commitTransaction();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment