Skip to content

Instantly share code, notes, and snippets.

@antonmry
Last active January 30, 2020 07:22
Show Gist options
  • Save antonmry/ab97b6d0001903713aaec42735d1cd81 to your computer and use it in GitHub Desktop.
Save antonmry/ab97b6d0001903713aaec42735d1cd81 to your computer and use it in GitHub Desktop.
SpringBoot application with Junit5 tests using EmbeddedKafka
plugins {
id 'org.springframework.boot' version '2.2.4.RELEASE'
id 'io.spring.dependency-management' version '1.0.9.RELEASE'
id 'java'
}
group = 'example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'
configurations {
developmentOnly
runtimeClasspath {
extendsFrom developmentOnly
}
compileOnly {
extendsFrom annotationProcessor
}
}
repositories {
mavenCentral()
}
dependencies {
// SpringBoot
implementation 'org.springframework.boot:spring-boot-starter-web'
// Kafka: https://docs.spring.io/spring-kafka/reference/html/#deps-for-24x
implementation 'org.springframework.kafka:spring-kafka:2.4.1.RELEASE'
implementation 'org.apache.kafka:kafka-clients:2.4.0'
testImplementation('org.springframework.kafka:spring-kafka-test:2.4.1.RELEASE') {
exclude module: 'kafka_2.11'
}
testImplementation 'org.apache.kafka:kafka-clients:2.4.0:test'
testImplementation 'org.apache.kafka:kafka_2.12:2.4.0'
testImplementation 'org.apache.kafka:kafka_2.12:2.4.0:test'
// Dev
compileOnly 'org.projectlombok:lombok'
testCompileOnly 'org.projectlombok:lombok'
developmentOnly 'org.springframework.boot:spring-boot-devtools'
annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'
annotationProcessor 'org.projectlombok:lombok'
testAnnotationProcessor 'org.projectlombok:lombok'
// Testing
testImplementation('org.springframework.boot:spring-boot-starter-test')
}
test {
useJUnitPlatform()
}
package example;
import lombok.*;
@Data
@NoArgsConstructor(access = AccessLevel.PRIVATE, force = true)
@AllArgsConstructor
public class Foo2 {
private String foo;
}
package example;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@Slf4j
public class MyApplication {
public static void main(String[] args) {
SpringApplication.run(MyApplication.class);
}
}
package example;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.util.backoff.FixedBackOff;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertTrue;
@EmbeddedKafka(topics = {"topic1", "topic1.DLT"},
bootstrapServersProperty = "spring.kafka.bootstrap-servers")
@SpringBootTest
@Slf4j
public class KafkaTests {
@Autowired
private KafkaTemplate<Object, Object> template;
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory,
KafkaTemplate<Object, Object> template) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.setErrorHandler(new SeekToCurrentErrorHandler(
// dead-letter after 3 tries
new DeadLetterPublishingRecoverer(template), new FixedBackOff(0L, 2)));
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}
@Bean
public RecordMessageConverter converter() {
return new StringJsonMessageConverter();
}
final CountDownLatch latch = new CountDownLatch(4);
@KafkaListener(id = "fooGroup", topics = "topic1", containerFactory = "kafkaListenerContainerFactory")
public void listen(Foo2 foo) {
log.info("Received: " + foo);
latch.countDown();
if (foo.getFoo().startsWith("fail")) {
throw new RuntimeException("failed");
}
}
@KafkaListener(id = "dltGroup", topics = "topic1.DLT")
public void dltListen(String in) {
log.info("Received from DLT: " + in);
}
@Test
public void testKafka() throws Exception {
// Send
template.send("topic1", new Foo2("foo"));
template.send("topic1", new Foo2("bar"));
template.send("topic1", new Foo2("baz"));
template.send("topic1", new Foo2("qux"));
template.flush();
assertTrue(latch.await(60, TimeUnit.SECONDS));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment