Skip to content

Instantly share code, notes, and snippets.

@mageddo
Last active August 14, 2019 02:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mageddo/e315c1eb325a79fa8356be48dc258b48 to your computer and use it in GitHub Desktop.
Save mageddo/e315c1eb325a79fa8356be48dc258b48 to your computer and use it in GitHub Desktop.
Kafka consumer example
// https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
// https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaKeyDeserializer);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaValueDeserializer);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "mygroup");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
try (final KafkaConsumer c = new KafkaConsumer(props)) {
c.subscribe(Arrays.asList(topic));
c.poll(0);
final List<TopicPartition> partitions = (List<TopicPartition>) c.partitionsFor(topic)
.stream()
.map(o -> {
final PartitionInfo partition = (PartitionInfo) o;
return new TopicPartition(partition.topic(), partition.partition());
})
.collect(Collectors.toList());
final Map<TopicPartition, Long> offsets = c.endOffsets(partitions);
for (Map.Entry<TopicPartition, Long> partition : offsets.entrySet()) {
if(partition.getValue() < from){
c.seek(partition.getKey(), partition.getValue());
} else {
c.seek(partition.getKey(), from);
}
logger.info("status=seek, key={}, offset={}", partition.getKey(), partition.getValue());
}
final StringBuilder msgs = new StringBuilder();
for(int emptyLoops = 0; emptyLoops < 10;){
final ConsumerRecords consumeRecords = c.poll(1000);
if(consumeRecords.count() == 0){
emptyLoops++;
} else {
emptyLoops = 0;
}
c.commitAsync();
}
return msgs.toString();
}
// https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment