Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
package com.example.demo;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
@EmbeddedKafka
@ExtendWith(SpringExtension.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class SimpleKafkaTest {
private static final String TOPIC = "domain-events";
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
BlockingQueue<ConsumerRecord<String, String>> records;
KafkaMessageListenerContainer<String, String> container;
@BeforeAll
void setUp() {
Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer());
ContainerProperties containerProperties = new ContainerProperties(TOPIC);
container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
records = new LinkedBlockingQueue<>();
container.setupMessageListener((MessageListener<String, String>) records::add);
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
}
@AfterAll
void tearDown() {
container.stop();
}
@Test
public void kafkaSetup_withTopic_ensureSendMessageIsReceived() throws Exception {
// Arrange
Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
Producer<String, String> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();
// Act
producer.send(new ProducerRecord<>(TOPIC, "my-aggregate-id", "{\"event\":\"Test Event\"}"));
producer.flush();
// Assert
ConsumerRecord<String, String> singleRecord = records.poll(100, TimeUnit.MILLISECONDS);
assertThat(singleRecord).isNotNull();
assertThat(singleRecord.key()).isEqualTo("my-aggregate-id");
assertThat(singleRecord.value()).isEqualTo("{\"event\":\"Test Event\"}");
}
}
@mayshwartz
Copy link

mayshwartz commented Mar 10, 2019

hi, as I saw - KafkaEmbedded is deprecated. will this example still work?

@dineshbhagat
Copy link

dineshbhagat commented Apr 29, 2019

@mayshwartz, It has been replaced by EmbeddedKafkaRule, some config changes are needed else concept remains the same.

@fjoalland
Copy link

fjoalland commented Jul 9, 2019

Hello,

Why do I need to import static org.assertj.core.api.Assertions.assertThat to be sure it will work, even if I don't need it?

Thank you

@vzickner
Copy link
Author

vzickner commented Jul 10, 2019

Hi @fjoalland,
You don't need to. I just used it for the assertions at the end of the test.

Valentin

@fjoalland
Copy link

fjoalland commented Jul 10, 2019

Thanks @vzickner

But that's weird, even if I remove the assertions at the end of the test it will crash here "ConsumerRecord<String, String> singleRecord = KafkaTestUtils.getSingleRecord(consumer, TOPIC);".

Anyway, I like your example, it's working for me. But do you think it's possible to test my Service with this example? I have a service who is communicating with a kafka server, and the problem is, when I import this service in my test and run the specific method who communicate with Kafka, it will send a message in my real kafka server. I don't want that, I thought kafka-test will make a kafka server mock everywhere and not only in my test.

@vzickner
Copy link
Author

vzickner commented Jul 12, 2019

Hi @fjoalland,
the test should work without the assertions as well.
The embedded kafka server is only enabled when you have the @EmbeddedKafka annotation. So you would need to skip this annotation for the other tests.

Valentin

@den-codeman
Copy link

den-codeman commented Sep 10, 2019

EmbeddedKafkaBroker could not autowire. Why?

@vzickner
Copy link
Author

vzickner commented Sep 10, 2019

Hi @ChDenister,
do you have the @EmbeddedKafka annotation? That one is registering the bean for the EmbeddedKafkaBroker. See also my blog post and the documentation.
Valentin

@shoreviewanalytics
Copy link

shoreviewanalytics commented Oct 14, 2019

I'm stuck getting any test to work using embedded kafka. Current message using your sample is:

SimpleKafkaTest.kafkaSetup_withTopic_ensureSendMessageIsReceived:66 [No records found for topic] expected:<[tru]e> but was:<[fals]e>

@vzickner
Copy link
Author

vzickner commented Oct 14, 2019

Hi @shoreviewanalytics,
I assume you have a timing issue and the message is sent before you are listening. In the related blog posts are different ways explained (see Configure Kafka Consumer). Have you replaced the .poll(0) with the not deprecated version?
Valentin

@shoreviewanalytics
Copy link

shoreviewanalytics commented Oct 15, 2019

I wonder if anyone is using Linux? I am using Ubuntu 18.04 and I cannot get the embedded Kafka to work at all. I built a new bare bones project and added this code and still it just hangs. I am able to other code just not the embedded Kafka. Thanks!

@shoreviewanalytics
Copy link

shoreviewanalytics commented Oct 15, 2019

Never mind, if I use version and a recent example all is well. https://docs.spring.io/spring-kafka/docs/2.3.0.RELEASE/reference/html/#testing

@jodoming
Copy link

jodoming commented Nov 8, 2019

Hi @shoreviewanalytics,
I assume you have a timing issue and the message is sent before you are listening. In the related blog posts are different ways explained (see Configure Kafka Consumer). Have you replaced the .poll(0) with the not deprecated version?
Valentin

And why it is a difference if we use the deprecated version of poll(long) instead of poll(Duration). I thought that you will explain it in your related blog, but I haven´t found any explanation.
Could you please explain to a completely newbee the reason of this different behaviour? It is a bug? What will be the solution when the deprecated poll(long) method would be available anymore in the future?
because for me your sample was very helpful to understand the operating mode of an embedded kafka.
Thanks for that!
Best regards, José

@SSamps
Copy link

SSamps commented Jan 3, 2020

I'm also having trouble with the @embeddedkafka annotation. I always seem to run into the following error in intelliJ when trying to Autowire the embeddedKafkaBroker and I can't figure out why:

"Could not autowire. No beans of 'EmbeddedKafkaBroker' type found."

@adhikariaman01
Copy link

adhikariaman01 commented Mar 8, 2020

I'm also having trouble with the @embeddedkafka annotation. I always seem to run into the following error in intelliJ when trying to Autowire the embeddedKafkaBroker and I can't figure out why:

"Could not autowire. No beans of 'EmbeddedKafkaBroker' type found."

You need to replace embeddedKafkaBroker bean with EmbeddedKafkaRule and get embeddedKafkaBroker from embeddedKafkaRule.
eg, :

@ClassRule
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1,true,TOPIC);

EmbeddedKafkaBroker embeddedKafkaBroker = embeddedKafkaRule.getEmbeddedKafka()

@sheriseDeRice
Copy link

sheriseDeRice commented May 14, 2020

Hi, I have tried to run the test class above, but I got an error:
Caused by: java.lang.noSuchFieldError: DEFAULT_SSL_PRINCIPLE_MAPPING_RULES
It seems to be caused by the @Autowired EmbeddedKafkaBroker.
I wonder if you have seen this error before and can provide some advice on this?

I have tried to use @ClassRule EmbeddedKafkaRule as well but also had the same error.

Thank you in advance.

@vzickner
Copy link
Author

vzickner commented May 14, 2020

Hi @sheriseDeRice, which Spring version are you using? On which OS are you executing it? Do you have other dependencies than the one mentioned above?

@sheriseDeRice
Copy link

sheriseDeRice commented May 14, 2020

Hi @vzickner, thank you for your response.
I am using below mvn dependencies versions:
spring-kafka: 2.4.5.RELEASE
spring-kafka-test: 2.4.5.RELEASE
junit-jupiter: 5.5.2
java: 1.8
no specific version mentioned for the dependency for spring-boot-starter and spring-boot-starter-test
my application is currently testing locally on a window machine.
I have more dependencies than the one in your sample but are not Kafka related.

@vzickner
Copy link
Author

vzickner commented May 14, 2020

That is newer than the version which is coming with spring-boot 2.2.7.RELEASE.
However, when I change the dependencies manually to the new version the test case above still runs for me without any modifications:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.2.7.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.example</groupId>
	<artifactId>demo</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>demo</name>
	<description>Demo project for Spring Boot</description>

	<properties>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
			<version>2.4.5.RELEASE</version>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
			<exclusions>
				<exclusion>
					<groupId>org.junit.vintage</groupId>
					<artifactId>junit-vintage-engine</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka-test</artifactId>
			<version>2.4.5.RELEASE</version>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

Can you check if it works in a simplified environment? Maybe you have version conflicts somewhere in your dependency tree on your classpath.

@kkarnwal
Copy link

kkarnwal commented Jun 3, 2020

how would we test exiting container annotated with @kafkalistener?

@vzickner
Copy link
Author

vzickner commented Jun 3, 2020

Hi @kkarnwal,
you can use the same setup with @KafkaListener. This is just a simple example how to do the setup.
Valentin

@kkarnwal
Copy link

kkarnwal commented Jun 3, 2020

If I load my existing container using spring runner, i checked with KafkaListenerEndpointRegistry, it is loading that container and showing running as status but when i sending the message using above sample producer to same topic where consumer is listening it is not comming there. what did you mean by statement - you can use the same setup with @KafkaListener
can you plz give one sample

@vzickner
Copy link
Author

vzickner commented Jun 3, 2020

What do you mean by container?

@Skwara
Copy link

Skwara commented Jul 14, 2020

@vzickner I would like to continue @kkarnwal question. I have a class that doesn't inherit from anything with a method annotated with @KafkaListener. What should I do to register that class for the topic in EmbeddedKafkaBroker?

@priyankaa-patil
Copy link

priyankaa-patil commented Oct 21, 2020

@vzickner can we use EmbeddedKafka in normal spring-boot application instead of tests?

@vzickner
Copy link
Author

vzickner commented Oct 22, 2020

@priyankaa-patil not sure if you are, but the dependency where it's part of is made for testing. I wouldn't recommend doing it

@saamsam
Copy link

saamsam commented Nov 5, 2020

Hi, do you know how to fix it this error?
Failed to load application context
caused by org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'embeddedKafka': Invocation of init method failed; nested exception is java.lang.NoSuchMethodError: kafka.utils.Logging.$init$(LKafka/utils/Logging;)V
Not sure what version has that class

@ilgrosso
Copy link

ilgrosso commented Nov 14, 2020

Thanks for this sample @vzickner!

Just FYI it seems that the two new HashMap<>(...) wrappers are not needed any more, at least in recent versions as they are set anyway internally by DefaultKafkaProducerFactory and DefaultKafkaConsumerFactory.

@surysharma
Copy link

surysharma commented Nov 26, 2020

Hi @vzickner, thanks for this kafka spring junit gist. However, I fail to understand what are we trying to test in the above example. Looks like we are trying to produce a message and and then reading it from the consumer. How can this test be used to test the topology using an integration test?

@vzickner
Copy link
Author

vzickner commented Nov 26, 2020

Hi @surysharma, this test is just to illustrate how you can use the Kafka testing framework. You would either replace the sending or the receiving part with your actual code which you would like to test or plug the functionality you would like to have in the middle (send and receive from different topics)

@karan413255
Copy link

karan413255 commented Apr 28, 2021

@vzickner I want to test some error scenarios like:

  1. Sending message to wrong topic
  2. When broker is down
  3. Timeout exception

How can I simulate this scenarios using spring-kafka-test? I had a hard time finding solutions for the above.

@vzickner
Copy link
Author

vzickner commented May 7, 2021

Hi @karan413255, I don't think they are a great fit for Spring Kafka. You would like to test your connection properties rather than the code. But looking at your list:

  1. Sending message to wrong topic: Just send the message to another topic. Question here is what your expected behavior would be.
  2. When broker is done: You don't even need to setup Spring Kafka. Just specify a port that doesn't exist. Again, expected behavior?
  3. Timeout exception: Hard to test, question is which kind of timeout? Response from Kafka or you are not receiving a message?

@sigalite
Copy link

sigalite commented Jul 13, 2021

hello, this example helped me a lot thanks. i have one question about Embedded Kafka. suppose i have few different test suites. how can i share the same embedded kafka broker? i think it will be redundant to start one for each test.
thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment