Skip to content

Instantly share code, notes, and snippets.

Last active January 10, 2023 14:57
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save phillipuniverse/4b3d39cdcceb2363a14ebdcc170d9059 to your computer and use it in GitHub Desktop.
Save phillipuniverse/4b3d39cdcceb2363a14ebdcc170d9059 to your computer and use it in GitHub Desktop.
JUnit 5 integration test with Spring Cloud Stream and embedded Kafka
package com.example.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
import com.example.demo.DemoApplication.MessageRequestConsumer;
import com.example.demo.DemoApplication.MessageRequestProducer;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
@EnableBinding({MessageRequestProducer.class, MessageRequestConsumer.class})
public class DemoApplication {
public static void main(String[] args) {, args);
public static interface MessageRequestProducer {
public static final String CHANNEL = "messageRequestOutput";
MessageChannel messageRequestOutput();
public static interface MessageRequestConsumer {
public static final String CHANNEL = "messageRequestInput";
SubscribableChannel messageRequestInput();
public static class MessageRequestListener {
public void handle(MessageRequest req) {
System.out.println("Do something");
public static class MessageRequest {
private String id;
public MessageRequest(@JsonProperty("id") String id) { = id;
public String getId() {
package com.example.demo;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import org.junit.jupiter.api.Test;
import org.mockito.AdditionalAnswers;
import org.mockito.Mockito;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.context.TestPropertySource;
import com.example.demo.DemoApplication.MessageRequest;
import com.example.demo.DemoApplication.MessageRequestListener;
import com.example.demo.DemoApplication.MessageRequestProducer;
// the log.dir here avoids notwriteableexceptions that occur if this tries to write on a normal
// fs outside of the build directory
@EmbeddedKafka(brokerProperties = "log.dir=target/${random.uuid}/embedded-kafka")
properties = {
// bridge between embedded Kafka and Spring Cloud Stream
// using real kafka
public class DemoApplicationTests {
static class Config {
public BeanPostProcessor messageRequestListenerPostProcessor() {
return new ProxiedMockPostProcessor(MessageRequestListener.class);
* See for
* the rationale behind this. I want real functionality to happen in the proxied
* {@literal @}StreamListener, but I also want to directly validate that methods were called
* that I expected
* @author Phillip Verheyden (phillipuniverse)
static class ProxiedMockPostProcessor implements BeanPostProcessor {
private final Class<?> mockedClass;
public ProxiedMockPostProcessor(Class<?> mockedClass) {
this.mockedClass = mockedClass;
public Object postProcessAfterInitialization(Object bean, String beanName)
throws BeansException {
if (mockedClass.isInstance(bean)) {
return Mockito.mock(mockedClass, AdditionalAnswers.delegatesTo(bean));
return bean;
private MessageRequestListener listener;
private MessageRequestProducer producer;
public void messageIsReceived() {
MessageRequest req = new MessageRequest("abc123");
// the message actually gets received. Need to do a timeout because I cannot manually force
// a consumption of this message from Kafka. The default for timeout() is to check every
// 10ms up to the timeout
verify(listener, timeout(5000))
.handle(argThat(m -> m.getId().equals(req.getId())));
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="" xmlns:xsi=""
<relativePath/> <!-- lookup parent from repository -->
<description>Demo project for Spring Boot</description>
Copy link

i am getting below error while executing the kafka test case. any suggestions on how to fix the issue?

2020-09-11 15:55:17 org.apache.zookeeper.ClientCnxn - Opening socket connection to server Will not attempt to authenticate using SASL (unknown error)
2020-09-11 15:55:17 logType=WARN org.apache.zookeeper.ClientCnxn - Session 0x0 for server, unexpected error, closing socket connection and attempting reconnect
java.nio.channels.UnresolvedAddressException: null
at java.base/
at java.base/
at java.base/
at org.apache.zookeeper.ClientCnxnSocketNIO.registerAndConnect(
at org.apache.zookeeper.ClientCnxnSocketNIO.connect(
at org.apache.zookeeper.ClientCnxn$SendThread.startConnect(
at org.apache.zookeeper.ClientCnxn$
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(
at com.intellij.rt.junit.JUnitStarter.main(
Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'embeddedKafka': Invocation of init method failed; nested exception is org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server '' with timeout of 6000 ms

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment