Skip to content

Instantly share code, notes, and snippets.

@yiquanzhou
Created October 26, 2016 09:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yiquanzhou/a94569a2c4ec8992444c83f3c393f596 to your computer and use it in GitHub Desktop.
Save yiquanzhou/a94569a2c4ec8992444c83f3c393f596 to your computer and use it in GitHub Desktop.
public class KafkaConsumerTest {
public static void main(String[] args) {
List<String> topics = new ArrayList<>();
topics.add("connect-test");
ConsumerLoop consumerLoop = new ConsumerLoop("group", topics);
consumerLoop.run();
}
static class ConsumerLoop implements Runnable {
private final KafkaConsumer<String, String> consumer;
private final List<String> topics;
public ConsumerLoop(String groupId, List<String> topics) {
this.topics = topics;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", groupId);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
this.consumer = new KafkaConsumer<>(props);
}
@Override
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
System.out.println("polled " + records.count() + " records");
for (ConsumerRecord<String, String> record : records) {
Map<String, Object> data = new HashMap<>();
data.put("partition", record.partition());
data.put("offset", record.offset());
data.put("value", record.value());
}
}
} catch (WakeupException e) {
// ignore for shutdown
} finally {
consumer.close();
}
}
public void shutdown() {
consumer.wakeup();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment