Skip to content

Instantly share code, notes, and snippets.

Forked from itzg/
Created January 21, 2020 12:17
Show Gist options
  • Save qyf404/300f43c42c12f8b1a9b571c5cb485215 to your computer and use it in GitHub Desktop.
Save qyf404/300f43c42c12f8b1a9b571c5cb485215 to your computer and use it in GitHub Desktop.
Using embedded Kafka in Spring Boot unit test
# disable serving of static web files since this is a REST/Actuator only web app
add-mappings: false
value-serializer: org.apache.kafka.common.serialization.StringSerializer
import static org.junit.Assert.assertThat;
import static org.springframework.kafka.test.hamcrest.KafkaMatchers.hasKey;
import static org.springframework.kafka.test.hamcrest.KafkaMatchers.hasValue;
import static org.springframework.kafka.test.utils.KafkaTestUtils.getSingleRecord;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.context.junit4.SpringRunner;
// Test with a "real", but embedded Kafka instance, gives us the EmbeddedKafkaBroker to autowire
// We're only needing to test Kafka serializing interactions, so keep partitioning simple
partitions = 1,
// use some non-default topics to test via
topics = {
// Using @SpringBootTest mainly so we can get the standard properties binding bootstrap processing
// and the properties source. The loading and binding of {@link KafkaTopicProperties} is one of
// the main things we're testing in this suite.
// tell Spring Boot Kafka auto-config about the embedded kafka endpoints
properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
// slice our unit test app context down to just these specific pieces
classes = {
// ...the service to test
// ...use standard Sprint Boot kafka auto-config to give us KafkaTemplate, etc
// ...and our additional test config
public class KafkaProducerTest {
public static final String TOPIC_METRICS = "test.metrics.json";
// Declare our own unit test Spring config
public static class TestConfig {
// Adjust our standard topic properties to point metrics at our test topic
public AppProperties appProperties() {
final AppProperties properties = new AppProperties();
return properties;
// IntelliJ gets confused finding this broker bean when @SpringBootTest is activated
// Autowire the kafka broker registered via @EmbeddedKafka
private EmbeddedKafkaBroker embeddedKafka;
// Autowire the service we're testing
OurService ourService;
public void testMetricsEncodedAsSent() {
ourService.send("tenant-1", KafkaMessageType.METRIC, "{\"id\":1}");
final Consumer<String, String> consumer = buildConsumer(
embeddedKafka.consumeFromEmbeddedTopics(consumer, TOPIC_METRICS);
final ConsumerRecord<String, String> record = getSingleRecord(consumer, TOPIC_METRICS, 500);
// Use Hamcrest matchers provided by spring-kafka-test
assertThat(record, hasKey("tenant-1"));
assertThat(record, hasValue("{\"id\":1}"));
private <K,V> Consumer<K, V> buildConsumer(Class<? extends Deserializer> keyDeserializer,
Class<? extends Deserializer> valueDeserializer) {
// Use the procedure documented at
final Map<String, Object> consumerProps = KafkaTestUtils
.consumerProps("testMetricsEncodedAsSent", "true", embeddedKafka);
// Since we're pre-sending the messages to test for, we need to read from start of topic
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// We need to match the ser/deser used in expected application config
.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getName());
.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getName());
final DefaultKafkaConsumerFactory<K, V> consumerFactory =
new DefaultKafkaConsumerFactory<>(consumerProps);
return consumerFactory.createConsumer();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment