Created
July 30, 2023 18:53
-
-
Save marttp/e2962e71a44e401e752de697078202bb to your computer and use it in GitHub Desktop.
Content Delivery Service - Example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package dev.tpcoder.contentdelivery.domain.course | |
import com.fasterxml.jackson.databind.ObjectMapper | |
import dev.tpcoder.contentdelivery.configuration.properties.KafkaChannelProperties | |
import dev.tpcoder.contentdelivery.domain.course.model.Course | |
import dev.tpcoder.contentdelivery.domain.course.model.KafkaPayload | |
import dev.tpcoder.contentdelivery.domain.course.model.ProgressEvent | |
import dev.tpcoder.contentdelivery.domain.course.model.Section | |
import org.apache.kafka.clients.producer.ProducerRecord | |
import org.slf4j.LoggerFactory | |
import org.springframework.web.bind.annotation.GetMapping | |
import org.springframework.web.bind.annotation.PostMapping | |
import org.springframework.web.bind.annotation.RequestBody | |
import org.springframework.web.bind.annotation.RequestMapping | |
import org.springframework.web.bind.annotation.RestController | |
import reactor.core.publisher.Mono | |
import reactor.kafka.sender.KafkaSender | |
import reactor.kafka.sender.SenderRecord | |
import java.time.Instant | |
import java.util.* | |
@RestController | |
@RequestMapping("/courses") | |
class CourseController( | |
private val objectMapper: ObjectMapper, | |
private val kafkaSender: KafkaSender<String, ByteArray>, | |
private val kafkaChannelProperties: KafkaChannelProperties | |
) { | |
private val logger = LoggerFactory.getLogger(CourseController::class.java) | |
@GetMapping | |
fun getAllCourses(): List<Course> { | |
return listOf( | |
Course( | |
id = 1, | |
title = "Backend development in Java", | |
description = "Backend development for people who want to be Java developers", | |
section = listOf( | |
Section( | |
id = 1, | |
title = "Introduction to Java" | |
), | |
Section( | |
id = 2, | |
title = "Spring Boot" | |
) | |
) | |
) | |
) | |
} | |
@PostMapping("/progress") | |
fun send(@RequestBody body: ProgressEvent): Mono<Void> { | |
val id = UUID.randomUUID() | |
val key = id.toString() | |
val payload = objectMapper.writeValueAsBytes( | |
KafkaPayload( | |
id = id.toString(), | |
data = body | |
) | |
) | |
val producerRecord = ProducerRecord( | |
kafkaChannelProperties.topic, | |
null, | |
Instant.now().toEpochMilli(), | |
key, // Kafka Message key for idempotency | |
payload, | |
null | |
) | |
val senderRecord = SenderRecord.create(producerRecord, id) | |
return kafkaSender.send(Mono.just(senderRecord)) | |
.doOnNext { senderResult -> | |
logger.info("Message sent to kafka channel: $senderResult") | |
} | |
.then() | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package dev.tpcoder.contentdelivery.domain.course | |
enum class CourseStatusEnum { | |
IN_PROGRESS, | |
COMPLETED | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package dev.tpcoder.contentdelivery.configuration.properties | |
import org.springframework.boot.context.properties.ConfigurationProperties | |
@ConfigurationProperties(prefix = "communication-channel.kafka") | |
data class KafkaChannelProperties( | |
val topic: String, | |
val bootstrapServers: String, | |
val clientId: String, | |
val retryBackoffMs: Int = 1000, | |
val requestTimeoutMs: Int = 30000, | |
val compressionType: String, | |
val acks: String = "all", | |
val retries: Int = 2000, | |
val maxRequestSize: Int = 1049600, | |
val reconnectBackoffMs: Int = 1000, | |
val batchSize: Int = 16384, | |
val bufferMemory: Int = 33554432, | |
val lingerMs: Int = 5, | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package dev.tpcoder.contentdelivery.domain.course.model | |
import dev.tpcoder.contentdelivery.domain.course.CourseStatusEnum | |
data class ProgressEvent( | |
val courseId: Long, | |
val userId: Long, | |
val sectionId: Long, | |
val status: CourseStatusEnum | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package dev.tpcoder.contentdelivery.configuration | |
import dev.tpcoder.contentdelivery.configuration.properties.KafkaChannelProperties | |
import org.apache.kafka.clients.producer.ProducerConfig | |
import org.apache.kafka.common.serialization.ByteArraySerializer | |
import org.apache.kafka.common.serialization.StringSerializer | |
import org.slf4j.LoggerFactory | |
import org.springframework.context.annotation.Bean | |
import org.springframework.context.annotation.Configuration | |
import reactor.kafka.sender.KafkaSender | |
import reactor.kafka.sender.SenderOptions | |
import java.util.* | |
@Configuration | |
class PublisherConfig(private val kafkaChannelProperties: KafkaChannelProperties) { | |
private val logger = LoggerFactory.getLogger(PublisherConfig::class.java) | |
private fun getSenderOptions(): SenderOptions<String, ByteArray> { | |
val properties = Properties() | |
properties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = kafkaChannelProperties.bootstrapServers | |
properties[ProducerConfig.CLIENT_ID_CONFIG] = kafkaChannelProperties.clientId | |
properties[ProducerConfig.ACKS_CONFIG] = kafkaChannelProperties.acks | |
properties[ProducerConfig.RETRIES_CONFIG] = kafkaChannelProperties.retries | |
properties[ProducerConfig.LINGER_MS_CONFIG] = kafkaChannelProperties.lingerMs | |
properties[ProducerConfig.MAX_REQUEST_SIZE_CONFIG] = kafkaChannelProperties.maxRequestSize | |
properties[ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG] = kafkaChannelProperties.reconnectBackoffMs | |
properties[ProducerConfig.RETRY_BACKOFF_MS_CONFIG] = kafkaChannelProperties.retryBackoffMs | |
properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java | |
properties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java | |
properties[ProducerConfig.COMPRESSION_TYPE_CONFIG] = kafkaChannelProperties.compressionType | |
properties[ProducerConfig.BATCH_SIZE_CONFIG] = kafkaChannelProperties.batchSize | |
properties[ProducerConfig.BUFFER_MEMORY_CONFIG] = kafkaChannelProperties.batchSize | |
properties[ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG] = kafkaChannelProperties.requestTimeoutMs | |
logger.info("Kafka publisher properties configured : {}", properties) | |
return SenderOptions.create(properties) | |
} | |
@Bean | |
fun kafkaSender(): KafkaSender<String, ByteArray> { | |
return KafkaSender.create(getSenderOptions()) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment