Skip to content

Instantly share code, notes, and snippets.

@Daniel-Jacob
Created March 4, 2022 10:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Daniel-Jacob/961c56737c46f1bd125cc50ec1f8c99e to your computer and use it in GitHub Desktop.
Save Daniel-Jacob/961c56737c46f1bd125cc50ec1f8c99e to your computer and use it in GitHub Desktop.
kafka listener that doesn't block the main topic on retries
import org.apache.kafka.common.errors.SerializationException
import org.slf4j.LoggerFactory
import org.springframework.context.annotation.Profile
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.kafka.annotation.RetryableTopic
import org.springframework.kafka.retrytopic.DltStrategy
import org.springframework.kafka.retrytopic.TopicSuffixingStrategy
import org.springframework.kafka.support.converter.ConversionException
import org.springframework.kafka.support.serializer.DeserializationException
import org.springframework.messaging.converter.MessageConversionException
import org.springframework.messaging.handler.invocation.MethodArgumentResolutionException
import org.springframework.retry.annotation.Backoff
import org.springframework.stereotype.Component
@Component
class NonBlockingKafkaListener {
private val log = LoggerFactory.getLogger(NonBlockingKafkaListener::class.java)
@RetryableTopic(
attempts = "\${retry-attempts}",
backoff = Backoff(delay = 200, multiplier = 3.0, maxDelay = 0),
numPartitions = "1",
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
dltStrategy = DltStrategy.FAIL_ON_ERROR,
exclude = [
DeserializationException::class,
SerializationException::class,
MessageConversionException::class,
ConversionException::class,
MethodArgumentResolutionException::class,
NoSuchMethodException::class,
ClassCastException::class
]
)
@KafkaListener(
id = "\${spring.kafka.consumer.group-id}",
topics = ["\${topic}"]
)
fun onReceive(message: String) {
log.info("processing message: $message")
throw Exception()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment