Skip to content

Instantly share code, notes, and snippets.

@phillipuniverse
Last active January 10, 2023 14:57
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save phillipuniverse/4b3d39cdcceb2363a14ebdcc170d9059 to your computer and use it in GitHub Desktop.
Save phillipuniverse/4b3d39cdcceb2363a14ebdcc170d9059 to your computer and use it in GitHub Desktop.
JUnit 5 integration test with Spring Cloud Stream and embedded Kafka
package com.example.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
import com.example.demo.DemoApplication.MessageRequestConsumer;
import com.example.demo.DemoApplication.MessageRequestProducer;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
@EnableBinding({MessageRequestProducer.class, MessageRequestConsumer.class})
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
public static interface MessageRequestProducer {
public static final String CHANNEL = "messageRequestOutput";
@Output(CHANNEL)
MessageChannel messageRequestOutput();
}
public static interface MessageRequestConsumer {
public static final String CHANNEL = "messageRequestInput";
@Input(CHANNEL)
SubscribableChannel messageRequestInput();
}
@Component
public static class MessageRequestListener {
@StreamListener(MessageRequestConsumer.CHANNEL)
public void handle(MessageRequest req) {
System.out.println("Do something");
}
}
public static class MessageRequest {
private String id;
@JsonCreator
public MessageRequest(@JsonProperty("id") String id) {
this.id = id;
}
public String getId() {
return this.id;
}
}
}
package com.example.demo;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import org.junit.jupiter.api.Test;
import org.mockito.AdditionalAnswers;
import org.mockito.Mockito;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.context.TestPropertySource;
import com.example.demo.DemoApplication.MessageRequest;
import com.example.demo.DemoApplication.MessageRequestListener;
import com.example.demo.DemoApplication.MessageRequestProducer;
@SpringBootTest
// the log.dir here avoids notwriteableexceptions that occur if this tries to write on a normal
// fs outside of the build directory
@EmbeddedKafka(brokerProperties = "log.dir=target/${random.uuid}/embedded-kafka")
@TestPropertySource(
properties = {
// bridge between embedded Kafka and Spring Cloud Stream
"spring.cloud.stream.kafka.binder.brokers=${spring.embedded.kafka.brokers}",
// using real kafka
"spring.autoconfigure.exclude=org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration",
"spring.cloud.stream.bindings.messageRequestInput.group=consumer",
"spring.cloud.stream.bindings.messageRequestInput.destination=messages",
"spring.cloud.stream.bindings.messageRequestOutput.destination=messages"
})
public class DemoApplicationTests {
@TestConfiguration
static class Config {
@Bean
public BeanPostProcessor messageRequestListenerPostProcessor() {
return new ProxiedMockPostProcessor(MessageRequestListener.class);
}
/**
* See https://github.com/spring-projects/spring-boot/issues/7033#issuecomment-393213222 for
* the rationale behind this. I want real functionality to happen in the proxied
* {@literal @}StreamListener, but I also want to directly validate that methods were called
* that I expected
*
* @author Phillip Verheyden (phillipuniverse)
*/
static class ProxiedMockPostProcessor implements BeanPostProcessor {
private final Class<?> mockedClass;
public ProxiedMockPostProcessor(Class<?> mockedClass) {
this.mockedClass = mockedClass;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName)
throws BeansException {
if (mockedClass.isInstance(bean)) {
return Mockito.mock(mockedClass, AdditionalAnswers.delegatesTo(bean));
}
return bean;
}
}
}
@Autowired
private MessageRequestListener listener;
@Autowired
private MessageRequestProducer producer;
@Test
public void messageIsReceived() {
MessageRequest req = new MessageRequest("abc123");
producer.messageRequestOutput().send(MessageBuilder
.withPayload(req)
.build());
// the message actually gets received. Need to do a timeout because I cannot manually force
// a consumption of this message from Kafka. The default for timeout() is to check every
// 10ms up to the timeout
verify(listener, timeout(5000))
.handle(argThat(m -> m.getId().equals(req.getId())));
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Greenwich.SR2</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
<version>5.4.2</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
@jayasreegit
Copy link

i am getting below error while executing the kafka test case. any suggestions on how to fix the issue?

2020-09-11 15:55:17 org.apache.zookeeper.ClientCnxn - Opening socket connection to server 127.0.0.1/:61252. Will not attempt to authenticate using SASL (unknown error)
2020-09-11 15:55:17 logType=WARN org.apache.zookeeper.ClientCnxn - Session 0x0 for server 127.0.0.1/:61252, unexpected error, closing socket connection and attempting reconnect
java.nio.channels.UnresolvedAddressException: null
at java.base/sun.nio.ch.Net.checkAddress(Net.java:139)
at java.base/sun.nio.ch.SocketChannelImpl.checkRemote(SocketChannelImpl.java:727)
at java.base/sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:741)
at org.apache.zookeeper.ClientCnxnSocketNIO.registerAndConnect(ClientCnxnSocketNIO.java:277)
at org.apache.zookeeper.ClientCnxnSocketNIO.connect(ClientCnxnSocketNIO.java:287)
at org.apache.zookeeper.ClientCnxn$SendThread.startConnect(ClientCnxn.java:1021)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1064)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'embeddedKafka': Invocation of init method failed; nested exception is org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server '127.0.0.1:61252' with timeout of 6000 ms

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment