Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>kafka-test-sample</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kafka-test-sample</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
package com.mimacom.kafka.eventsourcing;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.HashMap;
import java.util.Map;
import static java.util.Collections.singleton;
import static org.assertj.core.api.Assertions.assertThat;
@RunWith(SpringRunner.class)
@SpringBootTest
@EmbeddedKafka
public class SimpleKafkaTest {
private static final String TOPIC = "domain-events";
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
private Consumer<String, String> consumer;
@Before
public void setUp() {
Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
consumer = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer()).createConsumer();
consumer.subscribe(singleton(TOPIC));
consumer.poll(0);
}
@After
public void tearDown() {
consumer.close();
}
@Test
public void kafkaSetup_withTopic_ensureSendMessageIsReceived() {
// Arrange
Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
Producer<String, String> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();
// Act
producer.send(new ProducerRecord<>(TOPIC, "my-aggregate-id", "{\"event\":\"Test Event\"}"));
producer.flush();
// Assert
ConsumerRecord<String, String> singleRecord = KafkaTestUtils.getSingleRecord(consumer, TOPIC);
assertThat(singleRecord).isNotNull();
assertThat(singleRecord.key()).isEqualTo("my-aggregate-id");
assertThat(singleRecord.value()).isEqualTo("{\"event\":\"Test Event\"}");
}
}
@mayshwartz

This comment has been minimized.

Copy link

commented Mar 10, 2019

hi, as I saw - KafkaEmbedded is deprecated. will this example still work?

@dineshbhagat

This comment has been minimized.

Copy link

commented Apr 29, 2019

@mayshwartz, It has been replaced by EmbeddedKafkaRule, some config changes are needed else concept remains the same.

@fjoalland

This comment has been minimized.

Copy link

commented Jul 9, 2019

Hello,

Why do I need to import static org.assertj.core.api.Assertions.assertThat to be sure it will work, even if I don't need it?

Thank you

@vzickner

This comment has been minimized.

Copy link
Owner Author

commented Jul 10, 2019

Hi @fjoalland,
You don't need to. I just used it for the assertions at the end of the test.

Valentin

@fjoalland

This comment has been minimized.

Copy link

commented Jul 10, 2019

Thanks @vzickner

But that's weird, even if I remove the assertions at the end of the test it will crash here "ConsumerRecord<String, String> singleRecord = KafkaTestUtils.getSingleRecord(consumer, TOPIC);".

Anyway, I like your example, it's working for me. But do you think it's possible to test my Service with this example? I have a service who is communicating with a kafka server, and the problem is, when I import this service in my test and run the specific method who communicate with Kafka, it will send a message in my real kafka server. I don't want that, I thought kafka-test will make a kafka server mock everywhere and not only in my test.

@vzickner

This comment has been minimized.

Copy link
Owner Author

commented Jul 12, 2019

Hi @fjoalland,
the test should work without the assertions as well.
The embedded kafka server is only enabled when you have the @EmbeddedKafka annotation. So you would need to skip this annotation for the other tests.

Valentin

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.