Skip to content

Instantly share code, notes, and snippets.

@fr33m0nk
Forked from werneckpaiva/KafkaSeekByTimestamp.java
Created November 24, 2022 08:45
Show Gist options
  • Save fr33m0nk/87c449c7db5417dd2c00cc7fae7bbe51 to your computer and use it in GitHub Desktop.
Save fr33m0nk/87c449c7db5417dd2c00cc7fae7bbe51 to your computer and use it in GitHub Desktop.
Kafka Consumer - seek by timestamp
// Get the list of partitions
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topicName);
// Transform PartitionInfo into TopicPartition
List<TopicPartition> topicPartitionList = partitionInfos
.stream()
.map(info -> new TopicPartition(topicName, info.partition()))
.collect(Collectors.toList());
// Assign the consumer to these partitions
consumer.assign(topicPartitionList);
// Look for offsets based on timestamp
Map<TopicPartition, Long> partitionTimestampMap = topicPartitionList.stream()
.collect(Collectors.toMap(tp -> tp, tp -> timestampMs));
Map<TopicPartition, OffsetAndTimestamp> partitionOffsetMap = consumer.offsetsForTimes(partitionTimestampMap);
// Force the consumer to seek for those offsets
partitionOffsetMap.forEach((tp, offsetAndTimestamp) -> consumer.seek(tp, offsetAndTimestamp.offset()));
@fr33m0nk
Copy link
Author

Another way:

String brokers = "localhost:9092";
String consumerGroupName = "test1337";
TopicPartition topicPartition = new TopicPartition("test", 0);
Long offset = 4L;

Map<TopicPartition, OffsetAndMetadata> toOffset = new HashMap<>();
toOffset.put(topicPartition, new OffsetAndMetadata(offset));

// Create AdminClient
final Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
AdminClient adminClient = AdminClient.create(properties);

try {
  // Check offsets before altering
  KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> offsetsBeforeResetFuture = adminClient.listConsumerGroupOffsets(consumerGroupName).partitionsToOffsetAndMetadata();
  System.out.println("Before: " + offsetsBeforeResetFuture.get().toString());

  // Alter offsets
  adminClient.alterConsumerGroupOffsets(consumerGroupName, toOffset).partitionResult(topicPartition).get();

  // Check offsets after altering
  KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> offsetsAfterResetFuture = adminClient.listConsumerGroupOffsets(consumerGroupName).partitionsToOffsetAndMetadata();
  System.out.println("After:  " + offsetsAfterResetFuture.get().toString());
} catch (InterruptedException e) {
  e.printStackTrace();
} catch (ExecutionException e) {
  e.printStackTrace();
} finally {
  adminClient.close();
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment