Created
April 6, 2020 19:16
-
-
Save garyrussell/d19cfd6a0e3545768707c9d0ad794856 to your computer and use it in GitHub Desktop.
SO60641945
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
logging.level.org.springframework.kafka.listener=off | |
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
<modelVersion>4.0.0</modelVersion> | |
<parent> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-parent</artifactId> | |
<version>2.1.7.RELEASE</version> | |
<relativePath/> <!-- lookup parent from repository --> | |
</parent> | |
<groupId>com.example</groupId> | |
<artifactId>so60641945</artifactId> | |
<version>0.0.1-SNAPSHOT</version> | |
<name>so60641945</name> | |
<description>Demo project for Spring Boot</description> | |
<properties> | |
<java.version>1.8</java.version> | |
</properties> | |
<dependencies> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.kafka</groupId> | |
<artifactId>spring-kafka</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-test</artifactId> | |
<scope>test</scope> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.kafka</groupId> | |
<artifactId>spring-kafka-test</artifactId> | |
<scope>test</scope> | |
</dependency> | |
</dependencies> | |
<build> | |
<plugins> | |
<plugin> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-maven-plugin</artifactId> | |
</plugin> | |
</plugins> | |
</build> | |
</project> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.demo; | |
import java.util.List; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import org.apache.kafka.clients.admin.NewTopic; | |
import org.apache.kafka.clients.consumer.Consumer; | |
import org.apache.kafka.clients.consumer.ConsumerRecord; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.springframework.boot.SpringApplication; | |
import org.springframework.boot.autoconfigure.SpringBootApplication; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.kafka.annotation.KafkaListener; | |
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; | |
import org.springframework.kafka.listener.ErrorHandler; | |
import org.springframework.kafka.listener.MessageListenerContainer; | |
import org.springframework.kafka.listener.SeekToCurrentErrorHandler; | |
import org.springframework.retry.backoff.FixedBackOffPolicy; | |
import org.springframework.retry.policy.AlwaysRetryPolicy; | |
import org.springframework.retry.support.RetryTemplate; | |
import org.springframework.stereotype.Component; | |
@SpringBootApplication | |
public class So60641945Application { | |
private static final Logger LOG = LoggerFactory.getLogger(So60641945Application.class); | |
public static void main(String[] args) { | |
SpringApplication.run(So60641945Application.class, args); | |
} | |
@Bean | |
public ErrorHandler eh() { | |
class MyEH extends SeekToCurrentErrorHandler { | |
MyEH() { | |
super(-1); | |
} | |
@Override | |
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, | |
MessageListenerContainer container) { | |
LOG.info("handle"); | |
super.handle(thrownException, records, consumer, container); | |
} | |
}; | |
return new MyEH(); | |
} | |
private final AtomicInteger count = new AtomicInteger(); | |
@KafkaListener(id = "so60641945", topics = "so60641945") | |
public void listen(String in) { | |
LOG.info(in + this.count.incrementAndGet()); | |
throw new RuntimeException(); | |
} | |
@Bean | |
public NewTopic topic() { | |
return new NewTopic("so60641945", 1, (short) 1); | |
} | |
} | |
@Component | |
class FactoryConfigurer { | |
FactoryConfigurer(ConcurrentKafkaListenerContainerFactory<?, ?> factory) { | |
RetryTemplate retryTemplate = new RetryTemplate(); | |
retryTemplate.setRetryPolicy(new AlwaysRetryPolicy()); | |
FixedBackOffPolicy backOff = new FixedBackOffPolicy(); | |
backOff.setBackOffPeriod(1000); | |
retryTemplate.setBackOffPolicy(backOff); | |
factory.setRetryTemplate(retryTemplate); | |
factory.setStatefulRetry(true); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.demo; | |
import org.junit.ClassRule; | |
import org.junit.Test; | |
import org.junit.runner.RunWith; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.boot.test.context.SpringBootTest; | |
import org.springframework.kafka.core.KafkaTemplate; | |
import org.springframework.kafka.test.rule.EmbeddedKafkaRule; | |
import org.springframework.test.context.junit4.SpringRunner; | |
@RunWith(SpringRunner.class) | |
@SpringBootTest | |
public class So60641945ApplicationTests { | |
@ClassRule | |
public static EmbeddedKafkaRule embedded = new EmbeddedKafkaRule(1); | |
@Autowired | |
private KafkaTemplate<String, String> template; | |
@Test | |
public void contextLoads() throws InterruptedException { | |
this.template.send("so60641945", "bar"); | |
Thread.sleep(30_000); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment