Skip to content

Instantly share code, notes, and snippets.

@werneckpaiva
Last active May 4, 2023 17:12
Show Gist options
  • Save werneckpaiva/466e7c6bd1eca98ee4c004f37b544de9 to your computer and use it in GitHub Desktop.
Save werneckpaiva/466e7c6bd1eca98ee4c004f37b544de9 to your computer and use it in GitHub Desktop.
Kafka Consumer - seek by timestamp
// Get the list of partitions
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topicName);
// Transform PartitionInfo into TopicPartition
List<TopicPartition> topicPartitionList = partitionInfos
.stream()
.map(info -> new TopicPartition(topicName, info.partition()))
.collect(Collectors.toList());
// Assign the consumer to these partitions
consumer.assign(topicPartitionList);
// Look for offsets based on timestamp
Map<TopicPartition, Long> partitionTimestampMap = topicPartitionList.stream()
.collect(Collectors.toMap(tp -> tp, tp -> timestampMs));
Map<TopicPartition, OffsetAndTimestamp> partitionOffsetMap = consumer.offsetsForTimes(partitionTimestampMap);
// Force the consumer to seek for those offsets
partitionOffsetMap.forEach((tp, offsetAndTimestamp) -> consumer.seek(tp, offsetAndTimestamp.offset()));
@werneckpaiva
Copy link
Author

werneckpaiva commented Nov 24, 2022 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment