Skip to content

Instantly share code, notes, and snippets.

@kirshiyin89
Created October 25, 2022 12:28
Show Gist options
  • Save kirshiyin89/df9eaad3af3a2ea42e2317c141ce2dab to your computer and use it in GitHub Desktop.
Save kirshiyin89/df9eaad3af3a2ea42e2317c141ce2dab to your computer and use it in GitHub Desktop.
the kafka listener implementation
@Component
@Slf4j
public class MyKafkaListener {
@RetryableTopic(
attempts = "5",
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
backoff = @Backoff(delay = 1000, multiplier = 2.0),
exclude = {SerializationException.class, DeserializationException.class}
)
@KafkaListener(id = "${spring.kafka.consumer.group-id}", topics = "${topic}")
public void handleMessage(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("Received message: {} from topic: {}", message, topic);
throw new RuntimeException("Test exception");
}
@DltHandler
public void handleDlt(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("Message: {} handled by dlq topic: {}", message, topic);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment