Skip to content

Instantly share code, notes, and snippets.

@JvmInline
value class PaymentMethod(val paymentMethod: String)
@JvmInline
value class PaymentId(val paymentId: UUID)
data class Payment(
val paymentId: PaymentId,
val paymentMethod: PaymentMethod
)
val Any.log: Logger get() = LoggerFactory.getLogger(this::class.java)
object ErrorHandler {
fun handleError(error: Error) = when(error) {
is ValidationError -> log.error("a validation error has occured")
is IOError -> log.error("an IO error has occured")
is FatalError -> log.error("a Fatal error has occured... exiting")
}
}
sealed class Error {
data class ValidationError(val id: Long, val message: String): Error()
data class IOError(val message: String, val throwable: Throwable): Error()
object FatalError : Error()
}
@Daniel-Jacob
Daniel-Jacob / NonBlockingKafkaListener.kt
Created March 4, 2022 10:52
kafka listener that doesn't block the main topic on retries
import org.apache.kafka.common.errors.SerializationException
import org.slf4j.LoggerFactory
import org.springframework.context.annotation.Profile
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.kafka.annotation.RetryableTopic
import org.springframework.kafka.retrytopic.DltStrategy
import org.springframework.kafka.retrytopic.TopicSuffixingStrategy
import org.springframework.kafka.support.converter.ConversionException
import org.springframework.kafka.support.serializer.DeserializationException
import org.springframework.messaging.converter.MessageConversionException
@Daniel-Jacob
Daniel-Jacob / KafkaConfig.kt
Last active March 4, 2022 15:09
Kafka config for retryable topic
import org.springframework.boot.autoconfigure.kafka.KafkaProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Profile
import org.springframework.kafka.annotation.EnableKafka
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
@Daniel-Jacob
Daniel-Jacob / application.yml
Last active March 7, 2022 04:16
yml for retryable topic config
topic: my-topic
retry-attempts: 8
server:
port: 8080
management:
server.port: 8081
spring:
kafka:
bootstrap-servers: http://localhost:29092
properties:
@Daniel-Jacob
Daniel-Jacob / KafkaConfig.kt
Last active March 4, 2022 12:38
kafka config for exponential backoff
@Bean
fun errorHandler(kafkaProperties: KafkaProperties): DefaultErrorHandler =
run {
val backOff = ExponentialBackOff(3000, 2.0)
backOff.maxElapsedTime = 60000
DefaultErrorHandler(DeadLetterPublishingRecoverer(errorKafkaTemplate(kafkaProperties)), backOff)
}
@Daniel-Jacob
Daniel-Jacob / FixedBackOffKafkaListener.kt
Created March 4, 2022 10:40
kafka listener for fixed backoff
@Component
class FixedBackOffKafkaListener {
private val log = LoggerFactory.getLogger(FixedBackOffKafkaListener::class.java)
@KafkaListener(
id = "\${spring.kafka.consumer.group-id}",
topics = ["\${topic}"]
)
fun onReceive(message: String) {
log.info("processing message: $message")
@Daniel-Jacob
Daniel-Jacob / KafkaConfig.kt
Last active March 4, 2022 15:06
kafka config for fixed backoff
import org.springframework.boot.autoconfigure.kafka.KafkaProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Profile
import org.springframework.kafka.annotation.EnableKafka
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate