Skip to content

Instantly share code, notes, and snippets.

@buuhsmead
Created March 7, 2022 09:20
Show Gist options
  • Save buuhsmead/bf72b2c4cfa53ccae7eab7eca95334fd to your computer and use it in GitHub Desktop.
Save buuhsmead/bf72b2c4cfa53ccae7eab7eca95334fd to your computer and use it in GitHub Desktop.
Direct low level Kafka consumer test
package com.github.buuhsmead.quarkus.messaging.quickstarts;
import io.quarkus.test.junit.QuarkusTest;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Properties;
// InMemoryConnector for Test needs upstream fix at https://github.com/smallrye/smallrye-reactive-messaging/issues/1571
@QuarkusTest
class ReactiveMessagingExampleTest {
@Test
public void testStatus() {
Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:9092");
properties.put("group.id", "test");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer(properties);
String topic = "bankid-signing-in";
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
System.out.println("Waiting or received ....");
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment