Skip to content

Instantly share code, notes, and snippets.

@Schachte
Created October 1, 2020 19:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Schachte/25a180bc77d1c607d59ca962ae197bd5 to your computer and use it in GitHub Desktop.
Save Schachte/25a180bc77d1c607d59ca962ae197bd5 to your computer and use it in GitHub Desktop.
Seek & Commit Logic for Kafka Custom Offset Manager
public void seekAndCommit() {
consumer.poll(Duration.ofSeconds(1));
Set<TopicPartition> consumerPartitionAssignments = consumer.assignment();
// This is the map of partitions the consumer owns as of latest poll
Map<TopicPartition, OffsetAndMetadata> partitionToMetadataMap = new HashMap<>();
for (var entry : currentOffsets.entrySet()) {
TopicPartition partition = entry.getKey();
OffsetAndMetadata offset = entry.getValue();
if (consumerPartitionAssignments.contains(partition)) {
// Need a partition metadata map where the offset is incremented by 1
OffsetAndMetadata newOffsetAndMetadata = new OffsetAndMetadata(offset.offset() + 1,
offset.metadata());
try {
// We seek partition by partition to reduce duplicates if a single seek execution fails
consumer.seek(partition, offset.offset() + 1);
partitionToMetadataMap.put(partition, newOffsetAndMetadata);
} catch (Exception e) {
Logger.orcConsumerSeekFailure(traceId, partition.partition(), offset.offset() + 1, e);
}
}
}
try {
consumer.commitSync(partitionToMetadataMap);
} catch (Exception e) {
Logger.orcConsumerCommitFailure(traceId, e);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment