Created
July 30, 2023 19:11
-
-
Save marttp/3327a3182a04e1d6abb38921d30c79f5 to your computer and use it in GitHub Desktop.
Interaction Service - Consumer example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package dev.tpcoder.interaction.domain.progression | |
import com.fasterxml.jackson.databind.ObjectMapper | |
import jakarta.annotation.PostConstruct | |
import jakarta.annotation.PreDestroy | |
import org.slf4j.LoggerFactory | |
import org.springframework.beans.factory.annotation.Qualifier | |
import org.springframework.stereotype.Service | |
import reactor.core.Disposable | |
import reactor.core.Disposables | |
import reactor.core.publisher.Mono | |
import reactor.kafka.receiver.KafkaReceiver | |
@Service | |
class CourseProgressReceiverWorker( | |
private val objectMapper: ObjectMapper, | |
@Qualifier("interestTopicReceiver") private val kafkaReceiver: KafkaReceiver<String, ByteArray> | |
) { | |
private val logger = LoggerFactory.getLogger(CourseProgressReceiverWorker::class.java) | |
private val disposables = Disposables.composite() | |
@PostConstruct | |
fun connect() { | |
disposables.add( | |
receiver() | |
) | |
} | |
@PreDestroy | |
fun disconnect() { | |
this.disposables.dispose() | |
} | |
fun receiver(): Disposable { | |
return kafkaReceiver.receive() | |
// .delayElements(Duration.ofSeconds(20)) | |
.doOnNext { logger.info("Key : ${it.key()}") } | |
// Don't subscribe on this step since it will load all the message to workload at once | |
// Take advantage of Backpressure | |
.flatMap { | |
val payload = objectMapper.readValue( | |
it.value(), | |
KafkaPayload::class.java | |
) | |
logger.info("Received payload : $payload") | |
Mono.just(it) | |
} | |
.doOnNext { | |
// Acknowledge the message. The offset will be committed automatically based on interval | |
it.receiverOffset().acknowledge() | |
} | |
// Subscribe this step as the last step | |
.subscribe() | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package dev.tpcoder.interaction.configuration.properties | |
import org.springframework.boot.context.properties.ConfigurationProperties | |
@ConfigurationProperties(prefix = "communication-channel.kafka") | |
data class KafkaChannelProperties( | |
val topic: String, | |
val bootstrapServers: String, | |
val clientId: String, | |
val groupId: String, | |
val retryBackoffMs: Int = 1000, | |
val requestTimeoutMs: Int = 30000, | |
val autoOffsetReset: String = "earliest", | |
val maxPollRecords: Int = 250, | |
val maxPollIntervalMs: Int = 300_000, | |
val sessionTimeoutMs: Int = 10_000, | |
val heartbeatIntervalMs: Int = 3000, | |
val autoCommitIntervalMs: Int = 5000, | |
val commitBatchSize: Int = 0, | |
val commitIntervalMs: Long = 5000L | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package dev.tpcoder.interaction.configuration | |
import dev.tpcoder.interaction.configuration.properties.KafkaChannelProperties | |
import org.apache.kafka.clients.consumer.ConsumerConfig | |
import org.apache.kafka.common.serialization.ByteArrayDeserializer | |
import org.apache.kafka.common.serialization.StringDeserializer | |
import org.slf4j.LoggerFactory | |
import org.springframework.context.annotation.Bean | |
import org.springframework.context.annotation.Configuration | |
import reactor.kafka.receiver.KafkaReceiver | |
import reactor.kafka.receiver.ReceiverOptions | |
import java.time.Duration | |
import java.util.* | |
@Configuration | |
class ReceiverConfig(private val kafkaChannelProperties: KafkaChannelProperties) { | |
private val logger = LoggerFactory.getLogger(ReceiverConfig::class.java) | |
fun getReceiverOptions(): ReceiverOptions<String, ByteArray> { | |
val properties = Properties() | |
properties[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = kafkaChannelProperties.bootstrapServers | |
properties[ConsumerConfig.GROUP_ID_CONFIG] = kafkaChannelProperties.groupId | |
properties[ConsumerConfig.CLIENT_ID_CONFIG] = kafkaChannelProperties.clientId | |
properties[ConsumerConfig.RETRY_BACKOFF_MS_CONFIG] = kafkaChannelProperties.retryBackoffMs | |
properties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = kafkaChannelProperties.autoOffsetReset | |
properties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = kafkaChannelProperties.maxPollRecords | |
properties[ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG] = kafkaChannelProperties.maxPollIntervalMs | |
properties[ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG] = kafkaChannelProperties.sessionTimeoutMs | |
properties[ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG] = kafkaChannelProperties.heartbeatIntervalMs | |
properties[ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG] = kafkaChannelProperties.requestTimeoutMs | |
properties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java | |
properties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java | |
properties[ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG] = kafkaChannelProperties.autoCommitIntervalMs | |
logger.info("Kafka receiver properties configured : {}", properties) | |
return ReceiverOptions.create<String, ByteArray>(properties) | |
.commitInterval(Duration.ofMillis(kafkaChannelProperties.commitIntervalMs)) | |
.commitBatchSize(kafkaChannelProperties.commitBatchSize) | |
} | |
@Bean | |
fun interestTopicReceiver(): KafkaReceiver<String, ByteArray> { | |
val receiverOption = getReceiverOptions() | |
return KafkaReceiver.create(receiverOption.subscription(listOf(kafkaChannelProperties.topic))) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment