Forked from werneckpaiva/KafkaSeekByTimestamp.java
Created
November 24, 2022 08:45
-
-
Save fr33m0nk/87c449c7db5417dd2c00cc7fae7bbe51 to your computer and use it in GitHub Desktop.
Kafka Consumer - seek by timestamp
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Get the list of partitions | |
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topicName); | |
// Transform PartitionInfo into TopicPartition | |
List<TopicPartition> topicPartitionList = partitionInfos | |
.stream() | |
.map(info -> new TopicPartition(topicName, info.partition())) | |
.collect(Collectors.toList()); | |
// Assign the consumer to these partitions | |
consumer.assign(topicPartitionList); | |
// Look for offsets based on timestamp | |
Map<TopicPartition, Long> partitionTimestampMap = topicPartitionList.stream() | |
.collect(Collectors.toMap(tp -> tp, tp -> timestampMs)); | |
Map<TopicPartition, OffsetAndTimestamp> partitionOffsetMap = consumer.offsetsForTimes(partitionTimestampMap); | |
// Force the consumer to seek for those offsets | |
partitionOffsetMap.forEach((tp, offsetAndTimestamp) -> consumer.seek(tp, offsetAndTimestamp.offset())); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Another way: