Skip to content

Instantly share code, notes, and snippets.

@marttp
Created July 30, 2023 19:11
Show Gist options
  • Save marttp/3327a3182a04e1d6abb38921d30c79f5 to your computer and use it in GitHub Desktop.
Save marttp/3327a3182a04e1d6abb38921d30c79f5 to your computer and use it in GitHub Desktop.
Interaction Service - Consumer example
package dev.tpcoder.interaction.domain.progression
import com.fasterxml.jackson.databind.ObjectMapper
import jakarta.annotation.PostConstruct
import jakarta.annotation.PreDestroy
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.stereotype.Service
import reactor.core.Disposable
import reactor.core.Disposables
import reactor.core.publisher.Mono
import reactor.kafka.receiver.KafkaReceiver
@Service
class CourseProgressReceiverWorker(
private val objectMapper: ObjectMapper,
@Qualifier("interestTopicReceiver") private val kafkaReceiver: KafkaReceiver<String, ByteArray>
) {
private val logger = LoggerFactory.getLogger(CourseProgressReceiverWorker::class.java)
private val disposables = Disposables.composite()
@PostConstruct
fun connect() {
disposables.add(
receiver()
)
}
@PreDestroy
fun disconnect() {
this.disposables.dispose()
}
fun receiver(): Disposable {
return kafkaReceiver.receive()
// .delayElements(Duration.ofSeconds(20))
.doOnNext { logger.info("Key : ${it.key()}") }
// Don't subscribe on this step since it will load all the message to workload at once
// Take advantage of Backpressure
.flatMap {
val payload = objectMapper.readValue(
it.value(),
KafkaPayload::class.java
)
logger.info("Received payload : $payload")
Mono.just(it)
}
.doOnNext {
// Acknowledge the message. The offset will be committed automatically based on interval
it.receiverOffset().acknowledge()
}
// Subscribe this step as the last step
.subscribe()
}
}
package dev.tpcoder.interaction.configuration.properties
import org.springframework.boot.context.properties.ConfigurationProperties
@ConfigurationProperties(prefix = "communication-channel.kafka")
data class KafkaChannelProperties(
val topic: String,
val bootstrapServers: String,
val clientId: String,
val groupId: String,
val retryBackoffMs: Int = 1000,
val requestTimeoutMs: Int = 30000,
val autoOffsetReset: String = "earliest",
val maxPollRecords: Int = 250,
val maxPollIntervalMs: Int = 300_000,
val sessionTimeoutMs: Int = 10_000,
val heartbeatIntervalMs: Int = 3000,
val autoCommitIntervalMs: Int = 5000,
val commitBatchSize: Int = 0,
val commitIntervalMs: Long = 5000L
)
package dev.tpcoder.interaction.configuration
import dev.tpcoder.interaction.configuration.properties.KafkaChannelProperties
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.serialization.StringDeserializer
import org.slf4j.LoggerFactory
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import reactor.kafka.receiver.KafkaReceiver
import reactor.kafka.receiver.ReceiverOptions
import java.time.Duration
import java.util.*
@Configuration
class ReceiverConfig(private val kafkaChannelProperties: KafkaChannelProperties) {
private val logger = LoggerFactory.getLogger(ReceiverConfig::class.java)
fun getReceiverOptions(): ReceiverOptions<String, ByteArray> {
val properties = Properties()
properties[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = kafkaChannelProperties.bootstrapServers
properties[ConsumerConfig.GROUP_ID_CONFIG] = kafkaChannelProperties.groupId
properties[ConsumerConfig.CLIENT_ID_CONFIG] = kafkaChannelProperties.clientId
properties[ConsumerConfig.RETRY_BACKOFF_MS_CONFIG] = kafkaChannelProperties.retryBackoffMs
properties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = kafkaChannelProperties.autoOffsetReset
properties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = kafkaChannelProperties.maxPollRecords
properties[ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG] = kafkaChannelProperties.maxPollIntervalMs
properties[ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG] = kafkaChannelProperties.sessionTimeoutMs
properties[ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG] = kafkaChannelProperties.heartbeatIntervalMs
properties[ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG] = kafkaChannelProperties.requestTimeoutMs
properties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
properties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
properties[ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG] = kafkaChannelProperties.autoCommitIntervalMs
logger.info("Kafka receiver properties configured : {}", properties)
return ReceiverOptions.create<String, ByteArray>(properties)
.commitInterval(Duration.ofMillis(kafkaChannelProperties.commitIntervalMs))
.commitBatchSize(kafkaChannelProperties.commitBatchSize)
}
@Bean
fun interestTopicReceiver(): KafkaReceiver<String, ByteArray> {
val receiverOption = getReceiverOptions()
return KafkaReceiver.create(receiverOption.subscription(listOf(kafkaChannelProperties.topic)))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment