Skip to content

Instantly share code, notes, and snippets.

@marttp
Created July 30, 2023 18:53
Show Gist options
  • Save marttp/e2962e71a44e401e752de697078202bb to your computer and use it in GitHub Desktop.
Save marttp/e2962e71a44e401e752de697078202bb to your computer and use it in GitHub Desktop.
Content Delivery Service - Example
package dev.tpcoder.contentdelivery.domain.course
import com.fasterxml.jackson.databind.ObjectMapper
import dev.tpcoder.contentdelivery.configuration.properties.KafkaChannelProperties
import dev.tpcoder.contentdelivery.domain.course.model.Course
import dev.tpcoder.contentdelivery.domain.course.model.KafkaPayload
import dev.tpcoder.contentdelivery.domain.course.model.ProgressEvent
import dev.tpcoder.contentdelivery.domain.course.model.Section
import org.apache.kafka.clients.producer.ProducerRecord
import org.slf4j.LoggerFactory
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController
import reactor.core.publisher.Mono
import reactor.kafka.sender.KafkaSender
import reactor.kafka.sender.SenderRecord
import java.time.Instant
import java.util.*
@RestController
@RequestMapping("/courses")
class CourseController(
private val objectMapper: ObjectMapper,
private val kafkaSender: KafkaSender<String, ByteArray>,
private val kafkaChannelProperties: KafkaChannelProperties
) {
private val logger = LoggerFactory.getLogger(CourseController::class.java)
@GetMapping
fun getAllCourses(): List<Course> {
return listOf(
Course(
id = 1,
title = "Backend development in Java",
description = "Backend development for people who want to be Java developers",
section = listOf(
Section(
id = 1,
title = "Introduction to Java"
),
Section(
id = 2,
title = "Spring Boot"
)
)
)
)
}
@PostMapping("/progress")
fun send(@RequestBody body: ProgressEvent): Mono<Void> {
val id = UUID.randomUUID()
val key = id.toString()
val payload = objectMapper.writeValueAsBytes(
KafkaPayload(
id = id.toString(),
data = body
)
)
val producerRecord = ProducerRecord(
kafkaChannelProperties.topic,
null,
Instant.now().toEpochMilli(),
key, // Kafka Message key for idempotency
payload,
null
)
val senderRecord = SenderRecord.create(producerRecord, id)
return kafkaSender.send(Mono.just(senderRecord))
.doOnNext { senderResult ->
logger.info("Message sent to kafka channel: $senderResult")
}
.then()
}
}
package dev.tpcoder.contentdelivery.domain.course
enum class CourseStatusEnum {
IN_PROGRESS,
COMPLETED
}
package dev.tpcoder.contentdelivery.configuration.properties
import org.springframework.boot.context.properties.ConfigurationProperties
@ConfigurationProperties(prefix = "communication-channel.kafka")
data class KafkaChannelProperties(
val topic: String,
val bootstrapServers: String,
val clientId: String,
val retryBackoffMs: Int = 1000,
val requestTimeoutMs: Int = 30000,
val compressionType: String,
val acks: String = "all",
val retries: Int = 2000,
val maxRequestSize: Int = 1049600,
val reconnectBackoffMs: Int = 1000,
val batchSize: Int = 16384,
val bufferMemory: Int = 33554432,
val lingerMs: Int = 5,
)
package dev.tpcoder.contentdelivery.domain.course.model
import dev.tpcoder.contentdelivery.domain.course.CourseStatusEnum
data class ProgressEvent(
val courseId: Long,
val userId: Long,
val sectionId: Long,
val status: CourseStatusEnum
)
package dev.tpcoder.contentdelivery.configuration
import dev.tpcoder.contentdelivery.configuration.properties.KafkaChannelProperties
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.serialization.StringSerializer
import org.slf4j.LoggerFactory
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import reactor.kafka.sender.KafkaSender
import reactor.kafka.sender.SenderOptions
import java.util.*
@Configuration
class PublisherConfig(private val kafkaChannelProperties: KafkaChannelProperties) {
private val logger = LoggerFactory.getLogger(PublisherConfig::class.java)
private fun getSenderOptions(): SenderOptions<String, ByteArray> {
val properties = Properties()
properties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = kafkaChannelProperties.bootstrapServers
properties[ProducerConfig.CLIENT_ID_CONFIG] = kafkaChannelProperties.clientId
properties[ProducerConfig.ACKS_CONFIG] = kafkaChannelProperties.acks
properties[ProducerConfig.RETRIES_CONFIG] = kafkaChannelProperties.retries
properties[ProducerConfig.LINGER_MS_CONFIG] = kafkaChannelProperties.lingerMs
properties[ProducerConfig.MAX_REQUEST_SIZE_CONFIG] = kafkaChannelProperties.maxRequestSize
properties[ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG] = kafkaChannelProperties.reconnectBackoffMs
properties[ProducerConfig.RETRY_BACKOFF_MS_CONFIG] = kafkaChannelProperties.retryBackoffMs
properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
properties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java
properties[ProducerConfig.COMPRESSION_TYPE_CONFIG] = kafkaChannelProperties.compressionType
properties[ProducerConfig.BATCH_SIZE_CONFIG] = kafkaChannelProperties.batchSize
properties[ProducerConfig.BUFFER_MEMORY_CONFIG] = kafkaChannelProperties.batchSize
properties[ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG] = kafkaChannelProperties.requestTimeoutMs
logger.info("Kafka publisher properties configured : {}", properties)
return SenderOptions.create(properties)
}
@Bean
fun kafkaSender(): KafkaSender<String, ByteArray> {
return KafkaSender.create(getSenderOptions())
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment