Skip to content

Instantly share code, notes, and snippets.

@krisskross
Created July 6, 2016 15:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save krisskross/a49e462bedb89505e372672cd81129ab to your computer and use it in GitHub Desktop.
Save krisskross/a49e462bedb89505e372672cd81129ab to your computer and use it in GitHub Desktop.
List<ConsumerRecord<byte[], byte[]>> lastRecords = new ArrayList<>();
try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps)) {
for (PartitionInfo info : consumer.partitionsFor(topic)) {
List<TopicPartition> list = new ArrayList<>();
list.add(new TopicPartition(topic, info.partition()));
consumer.assign(list);
consumer.seekToEnd(list);
ConsumerRecord<byte[], byte[]> last = consumer.poll(10000).iterator().next();
lastRecords.add(last);
}
}
lastRecords.stream().sorted((r1,r2) -> r1.partition() - r2.partition())
.forEach(r -> {
System.out.println(r.partition() + " " + r.offset() + " " + new Date(r.timestamp()));
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment