Skip to content

Instantly share code, notes, and snippets.

@mmacphail
Created June 22, 2021 12:14
Show Gist options
  • Save mmacphail/62dd4889d6d6f6e9e26d504e8cb8e3ce to your computer and use it in GitHub Desktop.
Save mmacphail/62dd4889d6d6f6e9e26d504e8cb8e3ce to your computer and use it in GitHub Desktop.
Ideal java consumer
package clients;
import io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
public class Consumer {
static final String KAFKA_TOPIC = "driver-positions";
/**
* Java consumer.
*/
public static void main(String[] args) {
System.out.println("Starting Java Consumer.");
// Configure the group id, location of the bootstrap server, default deserializers,
// Confluent interceptors
final Properties settings = new Properties();
settings.put(ConsumerConfig.GROUP_ID_CONFIG, "java-consumer");
settings.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
settings.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
settings.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
settings.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, List.of(MonitoringConsumerInterceptor.class));
// performance
settings.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "5000");
settings.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "5000000");
// disable auto commit
// settings.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(settings);
// guarantie ?
// niveau de commit : auto-commit (5 secs)
// garantie un commit régulier après le traitement des messages
// at least once
try {
// Subscribe to our topic
consumer.subscribe(Arrays.asList(KAFKA_TOPIC), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
consumer.commitSync();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) { }
});
while (true) {
// TODO: Poll for available records
// Commit si plus de 5 secs avant le dernier commit
final ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(200));
for (ConsumerRecord<String, String> record : records) {
// TODO: print the contents of the record
System.out.printf("Key: %s, Value: %s, Partition: %d\n", record.key(), record.value(), record.partition());
}
// consumer.commitSync();
consumer.commitAsync();
}
} finally {
// Clean up when the application exits or errors
System.out.println("Closing consumer.");
consumer.close();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment