Skip to content

Instantly share code, notes, and snippets.

@priyanshus
Created December 15, 2020 14:07
Show Gist options
  • Save priyanshus/60919e46a32e4acd999159b064e75302 to your computer and use it in GitHub Desktop.
Save priyanshus/60919e46a32e4acd999159b064e75302 to your computer and use it in GitHub Desktop.
Kafka Consumer For Partitions
List<String> consumer(String topic) {
List<String> events = new ArrayList<>();
List<PartitionInfo> partitionInfos = null;
List<TopicPartition> partitions = new ArrayList<>();
partitionInfos = consumer.partitionsFor(topic);
if (partitionInfos != null) {
for (PartitionInfo partition : partitionInfos)
partitions.add(new TopicPartition(partition.topic(),
partition.partition()));
consumer.assign(partitions);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
events.add(record.value());
}
consumer.commitSync();
if (!records.isEmpty()) {
break;
}
}
}
return events;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment