Skip to content

Instantly share code, notes, and snippets.

@srdo
Created May 5, 2018 11:54
Show Gist options
  • Save srdo/e2a75d45a227f1a43e96cd38ab7194d3 to your computer and use it in GitHub Desktop.
Save srdo/e2a75d45a227f1a43e96cd38ab7194d3 to your computer and use it in GitHub Desktop.
Example KafkaConsumer read and commit
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-test");
props.put("enable.auto.commit", "false");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
Set<TopicPartition> assignedPartitions = Collections.singleton(new TopicPartition(TOPIC, 0));
consumer.assign(assignedPartitions);
consumer.seekToBeginning(assignedPartitions);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(2000);
for (TopicPartition tp : records.partitions()) {
List<ConsumerRecord<String, String>> tpRecs = records.records(tp);
ConsumerRecord<String, String> lastMessage = tpRecs.get(tpRecs.size() - 1);
LOG.info("Polled last message with offset {}, committing offset {}", lastMessage.offset(), lastMessage.offset() + 1);
consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(lastMessage.offset() + 1)));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment