The configuration sets error handler that replays the failed events.
For more details check:
The configuration sets error handler that replays the failed events.
For more details check:
spring: | |
kafka: | |
listener: | |
ack-mode: record |
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; | |
import org.springframework.kafka.core.ConsumerFactory; | |
import org.springframework.kafka.listener.SeekToCurrentErrorHandler; | |
@Configuration | |
public class KafkaConfig { | |
@Bean | |
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( | |
ConcurrentKafkaListenerContainerFactoryConfigurer configurer, | |
ConsumerFactory<Object, Object> kafkaConsumerFactory) { | |
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); | |
configurer.configure(factory, kafkaConsumerFactory); | |
factory.setErrorHandler(new SeekToCurrentErrorHandler()); | |
return factory; | |
} | |
} |