Skip to content

Instantly share code, notes, and snippets.

@vzickner
Last active June 21, 2023 10:15
Show Gist options
  • Star 15 You must be signed in to star a gist
  • Fork 12 You must be signed in to fork a gist
  • Save vzickner/577c53164a97b9918a49e6c0235813f4 to your computer and use it in GitHub Desktop.
Save vzickner/577c53164a97b9918a49e6c0235813f4 to your computer and use it in GitHub Desktop.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
package com.example.demo;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
@EmbeddedKafka
@ExtendWith(SpringExtension.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class SimpleKafkaTest {
private static final String TOPIC = "domain-events";
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
BlockingQueue<ConsumerRecord<String, String>> records;
KafkaMessageListenerContainer<String, String> container;
@BeforeAll
void setUp() {
Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer());
ContainerProperties containerProperties = new ContainerProperties(TOPIC);
container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
records = new LinkedBlockingQueue<>();
container.setupMessageListener((MessageListener<String, String>) records::add);
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
}
@AfterAll
void tearDown() {
container.stop();
}
@Test
public void kafkaSetup_withTopic_ensureSendMessageIsReceived() throws Exception {
// Arrange
Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
Producer<String, String> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();
// Act
producer.send(new ProducerRecord<>(TOPIC, "my-aggregate-id", "{\"event\":\"Test Event\"}"));
producer.flush();
// Assert
ConsumerRecord<String, String> singleRecord = records.poll(100, TimeUnit.MILLISECONDS);
assertThat(singleRecord).isNotNull();
assertThat(singleRecord.key()).isEqualTo("my-aggregate-id");
assertThat(singleRecord.value()).isEqualTo("{\"event\":\"Test Event\"}");
}
}
@kkarnwal
Copy link

kkarnwal commented Jun 3, 2020

how would we test exiting container annotated with @kafkalistener?

@vzickner
Copy link
Author

vzickner commented Jun 3, 2020

Hi @kkarnwal,
you can use the same setup with @KafkaListener. This is just a simple example how to do the setup.
Valentin

@kkarnwal
Copy link

kkarnwal commented Jun 3, 2020

If I load my existing container using spring runner, i checked with KafkaListenerEndpointRegistry, it is loading that container and showing running as status but when i sending the message using above sample producer to same topic where consumer is listening it is not comming there. what did you mean by statement - you can use the same setup with @KafkaListener
can you plz give one sample

@vzickner
Copy link
Author

vzickner commented Jun 3, 2020

What do you mean by container?

@Skwara
Copy link

Skwara commented Jul 14, 2020

@vzickner I would like to continue @kkarnwal question. I have a class that doesn't inherit from anything with a method annotated with @KafkaListener. What should I do to register that class for the topic in EmbeddedKafkaBroker?

@priyankaa-patil
Copy link

@vzickner can we use EmbeddedKafka in normal spring-boot application instead of tests?

@vzickner
Copy link
Author

@priyankaa-patil not sure if you are, but the dependency where it's part of is made for testing. I wouldn't recommend doing it

@saamsam
Copy link

saamsam commented Nov 5, 2020

Hi, do you know how to fix it this error?
Failed to load application context
caused by org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'embeddedKafka': Invocation of init method failed; nested exception is java.lang.NoSuchMethodError: kafka.utils.Logging.$init$(LKafka/utils/Logging;)V
Not sure what version has that class

@ilgrosso
Copy link

Thanks for this sample @vzickner!

Just FYI it seems that the two new HashMap<>(...) wrappers are not needed any more, at least in recent versions as they are set anyway internally by DefaultKafkaProducerFactory and DefaultKafkaConsumerFactory.

@surysharma
Copy link

Hi @vzickner, thanks for this kafka spring junit gist. However, I fail to understand what are we trying to test in the above example. Looks like we are trying to produce a message and and then reading it from the consumer. How can this test be used to test the topology using an integration test?

@vzickner
Copy link
Author

Hi @surysharma, this test is just to illustrate how you can use the Kafka testing framework. You would either replace the sending or the receiving part with your actual code which you would like to test or plug the functionality you would like to have in the middle (send and receive from different topics)

@karan413255
Copy link

@vzickner I want to test some error scenarios like:

  1. Sending message to wrong topic
  2. When broker is down
  3. Timeout exception

How can I simulate this scenarios using spring-kafka-test? I had a hard time finding solutions for the above.

@vzickner
Copy link
Author

vzickner commented May 7, 2021

Hi @karan413255, I don't think they are a great fit for Spring Kafka. You would like to test your connection properties rather than the code. But looking at your list:

  1. Sending message to wrong topic: Just send the message to another topic. Question here is what your expected behavior would be.
  2. When broker is done: You don't even need to setup Spring Kafka. Just specify a port that doesn't exist. Again, expected behavior?
  3. Timeout exception: Hard to test, question is which kind of timeout? Response from Kafka or you are not receiving a message?

@sigalite
Copy link

hello, this example helped me a lot thanks. i have one question about Embedded Kafka. suppose i have few different test suites. how can i share the same embedded kafka broker? i think it will be redundant to start one for each test.
thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment