Skip to content

Instantly share code, notes, and snippets.

@csaltos
Last active May 5, 2020 12:09
Show Gist options
  • Save csaltos/b3d79d43c9986024f43bac3481480d94 to your computer and use it in GitHub Desktop.
Save csaltos/b3d79d43c9986024f43bac3481480d94 to your computer and use it in GitHub Desktop.
package com.tutorial1;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class ConsumerDemo {
public static void main(String[] args) {
ConsumerDemoWorker consumerDemoWorker = new ConsumerDemoWorker();
new Thread(consumerDemoWorker).start();
Runtime.getRuntime().addShutdownHook(new Thread(new ConsumerDemoCloser(consumerDemoWorker)));
}
private static class ConsumerDemoWorker implements Runnable {
private static final Logger log = LoggerFactory.getLogger(ConsumerDemoWorker.class);
private CountDownLatch countDownLatch;
private Consumer<String, String> consumer;
@Override
public void run() {
countDownLatch = new CountDownLatch(1);
final Properties properties = new Properties();
final Duration pollTimeout = Duration.ofMillis(100);
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-fourth-application");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singleton("first_topic"));
try {
while (true) {
final ConsumerRecords<String, String> consumerRecords = consumer.poll(pollTimeout);
for (final ConsumerRecord<String, String> consumerRecord : consumerRecords) {
log.info("Getting consumer record key: '" + consumerRecord.key() + "', value: '" + consumerRecord.value() + "', partition: " + consumerRecord.partition() + " and offset: " + consumerRecord.offset() + " at " + new Date(consumerRecord.timestamp()));
}
}
} catch (WakeupException e) {
log.info("Consumer poll woke up");
consumer.close();
} finally {
countDownLatch.countDown();
}
}
void shutdown() throws InterruptedException {
consumer.wakeup();
countDownLatch.await();
log.info("Consumer closed");
}
}
private static class ConsumerDemoCloser implements Runnable {
private static final Logger log = LoggerFactory.getLogger(ConsumerDemoCloser.class);
private final ConsumerDemoWorker consumerDemoWorker;
ConsumerDemoCloser(final ConsumerDemoWorker consumerDemoWorker) {
this.consumerDemoWorker = consumerDemoWorker;
}
@Override
public void run() {
try {
consumerDemoWorker.shutdown();
} catch (InterruptedException e) {
log.error("Error shutting down consumer", e);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment