Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>kafka-test-sample</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kafka-test-sample</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
package com.mimacom.kafka.eventsourcing;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.HashMap;
import java.util.Map;
import static java.util.Collections.singleton;
import static org.assertj.core.api.Assertions.assertThat;
@RunWith(SpringRunner.class)
@SpringBootTest
@EmbeddedKafka
public class SimpleKafkaTest {
private static final String TOPIC = "domain-events";
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
private Consumer<String, String> consumer;
@Before
public void setUp() {
Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
consumer = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer()).createConsumer();
consumer.subscribe(singleton(TOPIC));
consumer.poll(0);
}
@After
public void tearDown() {
consumer.close();
}
@Test
public void kafkaSetup_withTopic_ensureSendMessageIsReceived() {
// Arrange
Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
Producer<String, String> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();
// Act
producer.send(new ProducerRecord<>(TOPIC, "my-aggregate-id", "{\"event\":\"Test Event\"}"));
producer.flush();
// Assert
ConsumerRecord<String, String> singleRecord = KafkaTestUtils.getSingleRecord(consumer, TOPIC);
assertThat(singleRecord).isNotNull();
assertThat(singleRecord.key()).isEqualTo("my-aggregate-id");
assertThat(singleRecord.value()).isEqualTo("{\"event\":\"Test Event\"}");
}
}
@mayshwartz

This comment has been minimized.

Copy link

mayshwartz commented Mar 10, 2019

hi, as I saw - KafkaEmbedded is deprecated. will this example still work?

@dineshbhagat

This comment has been minimized.

Copy link

dineshbhagat commented Apr 29, 2019

@mayshwartz, It has been replaced by EmbeddedKafkaRule, some config changes are needed else concept remains the same.

@fjoalland

This comment has been minimized.

Copy link

fjoalland commented Jul 9, 2019

Hello,

Why do I need to import static org.assertj.core.api.Assertions.assertThat to be sure it will work, even if I don't need it?

Thank you

@vzickner

This comment has been minimized.

Copy link
Owner Author

vzickner commented Jul 10, 2019

Hi @fjoalland,
You don't need to. I just used it for the assertions at the end of the test.

Valentin

@fjoalland

This comment has been minimized.

Copy link

fjoalland commented Jul 10, 2019

Thanks @vzickner

But that's weird, even if I remove the assertions at the end of the test it will crash here "ConsumerRecord<String, String> singleRecord = KafkaTestUtils.getSingleRecord(consumer, TOPIC);".

Anyway, I like your example, it's working for me. But do you think it's possible to test my Service with this example? I have a service who is communicating with a kafka server, and the problem is, when I import this service in my test and run the specific method who communicate with Kafka, it will send a message in my real kafka server. I don't want that, I thought kafka-test will make a kafka server mock everywhere and not only in my test.

@vzickner

This comment has been minimized.

Copy link
Owner Author

vzickner commented Jul 12, 2019

Hi @fjoalland,
the test should work without the assertions as well.
The embedded kafka server is only enabled when you have the @EmbeddedKafka annotation. So you would need to skip this annotation for the other tests.

Valentin

@den-codeman

This comment has been minimized.

Copy link

den-codeman commented Sep 10, 2019

EmbeddedKafkaBroker could not autowire. Why?

@vzickner

This comment has been minimized.

Copy link
Owner Author

vzickner commented Sep 10, 2019

Hi @ChDenister,
do you have the @EmbeddedKafka annotation? That one is registering the bean for the EmbeddedKafkaBroker. See also my blog post and the documentation.
Valentin

@shoreviewanalytics

This comment has been minimized.

Copy link

shoreviewanalytics commented Oct 14, 2019

I'm stuck getting any test to work using embedded kafka. Current message using your sample is:

SimpleKafkaTest.kafkaSetup_withTopic_ensureSendMessageIsReceived:66 [No records found for topic] expected:<[tru]e> but was:<[fals]e>

@vzickner

This comment has been minimized.

Copy link
Owner Author

vzickner commented Oct 14, 2019

Hi @shoreviewanalytics,
I assume you have a timing issue and the message is sent before you are listening. In the related blog posts are different ways explained (see Configure Kafka Consumer). Have you replaced the .poll(0) with the not deprecated version?
Valentin

@shoreviewanalytics

This comment has been minimized.

Copy link

shoreviewanalytics commented Oct 15, 2019

I wonder if anyone is using Linux? I am using Ubuntu 18.04 and I cannot get the embedded Kafka to work at all. I built a new bare bones project and added this code and still it just hangs. I am able to other code just not the embedded Kafka. Thanks!

@shoreviewanalytics

This comment has been minimized.

Copy link

shoreviewanalytics commented Oct 15, 2019

Never mind, if I use version and a recent example all is well. https://docs.spring.io/spring-kafka/docs/2.3.0.RELEASE/reference/html/#testing

@jodoming

This comment has been minimized.

Copy link

jodoming commented Nov 8, 2019

Hi @shoreviewanalytics,
I assume you have a timing issue and the message is sent before you are listening. In the related blog posts are different ways explained (see Configure Kafka Consumer). Have you replaced the .poll(0) with the not deprecated version?
Valentin

And why it is a difference if we use the deprecated version of poll(long) instead of poll(Duration). I thought that you will explain it in your related blog, but I haven´t found any explanation.
Could you please explain to a completely newbee the reason of this different behaviour? It is a bug? What will be the solution when the deprecated poll(long) method would be available anymore in the future?
because for me your sample was very helpful to understand the operating mode of an embedded kafka.
Thanks for that!
Best regards, José

@SSamps

This comment has been minimized.

Copy link

SSamps commented Jan 3, 2020

I'm also having trouble with the @embeddedkafka annotation. I always seem to run into the following error in intelliJ when trying to Autowire the embeddedKafkaBroker and I can't figure out why:

"Could not autowire. No beans of 'EmbeddedKafkaBroker' type found."

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.