Skip to content

Instantly share code, notes, and snippets.

@jivimberg
Last active October 11, 2023 00:16
Show Gist options
  • Star 26 You must be signed in to star a gist
  • Fork 8 You must be signed in to fork a gist
  • Save jivimberg/b0f4f94871c6f3e7d17fae1106c28047 to your computer and use it in GitHub Desktop.
Save jivimberg/b0f4f94871c6f3e7d17fae1106c28047 to your computer and use it in GitHub Desktop.
SQS Consumer using Kotlin coroutines and pool of workers.
package com.jivimberg.sqs.published
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.isActive
import kotlinx.coroutines.yield
import java.lang.Thread.currentThread
suspend fun CoroutineScope.repeatUntilCancelled(block: suspend () -> Unit) {
while (isActive) {
try {
block()
yield()
} catch (ex: CancellationException) {
println("coroutine on ${currentThread().name} cancelled")
} catch (ex: Exception) {
println("${currentThread().name} failed with {$ex}. Retrying...")
ex.printStackTrace()
}
}
println("coroutine on ${currentThread().name} exiting")
}
package com.jivimberg.sqs.published
import com.jivimberg.sqs.SQS_URL
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.future.await
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.Message
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest
import kotlin.coroutines.CoroutineContext
class SqsSampleConsumerChannels(
private val sqs: SqsAsyncClient
) : CoroutineScope {
private val supervisorJob = SupervisorJob()
override val coroutineContext: CoroutineContext
get() = Dispatchers.IO + supervisorJob
fun start() = launch {
val messageChannel = Channel<Message>()
repeat(N_WORKERS) { launchWorker(messageChannel) }
launchMsgReceiver(messageChannel)
}
fun stop() {
supervisorJob.cancel()
}
private fun CoroutineScope.launchMsgReceiver(channel: SendChannel<Message>) = launch {
repeatUntilCancelled {
val receiveRequest = ReceiveMessageRequest.builder()
.queueUrl(SQS_URL)
.waitTimeSeconds(20)
.maxNumberOfMessages(10)
.build()
val messages = sqs.receiveMessage(receiveRequest).await().messages()
println("${Thread.currentThread().name} Retrieved ${messages.size} messages")
messages.forEach {
channel.send(it)
}
}
}
private fun CoroutineScope.launchWorker(channel: ReceiveChannel<Message>) = launch {
repeatUntilCancelled {
for (msg in channel) {
try {
processMsg(msg)
deleteMessage(msg)
} catch (ex: Exception) {
println("${Thread.currentThread().name} exception trying to process message ${msg.body()}")
ex.printStackTrace()
changeVisibility(msg)
}
}
}
}
private suspend fun processMsg(message: Message) {
println("${Thread.currentThread().name} Started processing message: ${message.body()}")
delay((1000L..2000L).random())
println("${Thread.currentThread().name} Finished processing of message: ${message.body()}")
}
private suspend fun deleteMessage(message: Message) {
sqs.deleteMessage { req ->
req.queueUrl(SQS_URL)
req.receiptHandle(message.receiptHandle())
}.await()
println("${Thread.currentThread().name} Message deleted: ${message.body()}")
}
private suspend fun changeVisibility(message: Message) {
sqs.changeMessageVisibility { req ->
req.queueUrl(SQS_URL)
req.receiptHandle(message.receiptHandle())
req.visibilityTimeout(10)
}.await()
println("${Thread.currentThread().name} Changed visibility of message: ${message.body()}")
}
}
fun main() = runBlocking {
println("${Thread.currentThread().name} Starting program")
val sqs = SqsAsyncClient.builder()
.region(Region.US_EAST_1)
.build()
val consumer = SqsSampleConsumerChannels(sqs)
consumer.start()
delay(30000)
consumer.stop()
}
private const val N_WORKERS = 4
package com.jivimberg.sqs.published
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sqs.SqsClient
import software.amazon.awssdk.services.sqs.model.SendMessageRequest
fun main() = runBlocking {
val sqs = SqsClient.builder()
.region(Region.US_EAST_1)
.build()
var id = 0
while (true) {
id++
val sendMsgRequest = SendMessageRequest.builder()
.queueUrl(SQS_URL)
.messageBody("hello world $id")
.build()
sqs.sendMessage(sendMsgRequest)
println("Message sent with id: $id")
delay((1000L..5000L).random())
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment