Skip to content

Instantly share code, notes, and snippets.

@Daniel-Jacob
Last active March 4, 2022 15:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Daniel-Jacob/46532e3395cc4f5fcd66105e5cf61cc6 to your computer and use it in GitHub Desktop.
Save Daniel-Jacob/46532e3395cc4f5fcd66105e5cf61cc6 to your computer and use it in GitHub Desktop.
kafka config for fixed backoff
import org.springframework.boot.autoconfigure.kafka.KafkaProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Profile
import org.springframework.kafka.annotation.EnableKafka
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer
import org.springframework.kafka.listener.DefaultErrorHandler
import org.springframework.util.backoff.FixedBackOff
@EnableKafka
@Configuration
class KafkaConfig {
@Bean
fun consumerFactory(properties: KafkaProperties): ConsumerFactory<String?, String?> =
DefaultKafkaConsumerFactory(properties.buildConsumerProperties())
@Bean
fun kafkaListenerContainerFactory(properties: KafkaProperties): ConcurrentKafkaListenerContainerFactory<String?, String?> =
ConcurrentKafkaListenerContainerFactory<String?, String?>()
.apply {
consumerFactory = consumerFactory(properties)
setCommonErrorHandler(errorHandler(properties))
}
@Bean
fun kafkaErrorProducerFactory(kafkaProperties: KafkaProperties): DefaultKafkaProducerFactory<String, String> {
return DefaultKafkaProducerFactory(kafkaProperties.buildProducerProperties())
}
@Bean
fun errorKafkaTemplate(kafkaProperties: KafkaProperties): KafkaTemplate<String, String> {
return KafkaTemplate(kafkaErrorProducerFactory(kafkaProperties))
}
@Bean
fun errorHandler(kafkaProperties: KafkaProperties): DefaultErrorHandler =
DefaultErrorHandler(DeadLetterPublishingRecoverer(errorKafkaTemplate(kafkaProperties)), FixedBackOff(3000, 3))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment