Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
SQS Consumer using Kotlin coroutines and pool of workers.
package com.jivimberg.sqs.published
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.isActive
import kotlinx.coroutines.yield
import java.lang.Thread.currentThread
suspend fun CoroutineScope.repeatUntilCancelled(block: suspend () -> Unit) {
while (isActive) {
try {
block()
yield()
} catch (ex: CancellationException) {
println("coroutine on ${currentThread().name} cancelled")
} catch (ex: Exception) {
println("${currentThread().name} failed with {$ex}. Retrying...")
ex.printStackTrace()
}
}
println("coroutine on ${currentThread().name} exiting")
}
package com.jivimberg.sqs.published
import com.jivimberg.sqs.SQS_URL
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.future.await
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.Message
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest
import kotlin.coroutines.CoroutineContext
class SqsSampleConsumerChannels(
private val sqs: SqsAsyncClient
) : CoroutineScope {
private val supervisorJob = SupervisorJob()
override val coroutineContext: CoroutineContext
get() = Dispatchers.IO + supervisorJob
fun start() = launch {
val messageChannel = Channel<Message>()
repeat(N_WORKERS) { launchWorker(messageChannel) }
launchMsgReceiver(messageChannel)
}
fun stop() {
supervisorJob.cancel()
}
private fun CoroutineScope.launchMsgReceiver(channel: SendChannel<Message>) = launch {
repeatUntilCancelled {
val receiveRequest = ReceiveMessageRequest.builder()
.queueUrl(SQS_URL)
.waitTimeSeconds(20)
.maxNumberOfMessages(10)
.build()
val messages = sqs.receiveMessage(receiveRequest).await().messages()
println("${Thread.currentThread().name} Retrieved ${messages.size} messages")
messages.forEach {
channel.send(it)
}
}
}
private fun CoroutineScope.launchWorker(channel: ReceiveChannel<Message>) = launch {
repeatUntilCancelled {
for (msg in channel) {
try {
processMsg(msg)
deleteMessage(msg)
} catch (ex: Exception) {
println("${Thread.currentThread().name} exception trying to process message ${msg.body()}")
ex.printStackTrace()
changeVisibility(msg)
}
}
}
}
private suspend fun processMsg(message: Message) {
println("${Thread.currentThread().name} Started processing message: ${message.body()}")
delay((1000L..2000L).random())
println("${Thread.currentThread().name} Finished processing of message: ${message.body()}")
}
private suspend fun deleteMessage(message: Message) {
sqs.deleteMessage { req ->
req.queueUrl(SQS_URL)
req.receiptHandle(message.receiptHandle())
}.await()
println("${Thread.currentThread().name} Message deleted: ${message.body()}")
}
private suspend fun changeVisibility(message: Message) {
sqs.changeMessageVisibility { req ->
req.queueUrl(SQS_URL)
req.receiptHandle(message.receiptHandle())
req.visibilityTimeout(10)
}.await()
println("${Thread.currentThread().name} Changed visibility of message: ${message.body()}")
}
}
fun main() = runBlocking {
println("${Thread.currentThread().name} Starting program")
val sqs = SqsAsyncClient.builder()
.region(Region.US_EAST_1)
.build()
val consumer = SqsSampleConsumerChannels(sqs)
consumer.start()
delay(30000)
consumer.stop()
}
private const val N_WORKERS = 4
package com.jivimberg.sqs.published
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sqs.SqsClient
import software.amazon.awssdk.services.sqs.model.SendMessageRequest
fun main() = runBlocking {
val sqs = SqsClient.builder()
.region(Region.US_EAST_1)
.build()
var id = 0
while (true) {
id++
val sendMsgRequest = SendMessageRequest.builder()
.queueUrl(SQS_URL)
.messageBody("hello world $id")
.build()
sqs.sendMessage(sendMsgRequest)
println("Message sent with id: $id")
delay((1000L..5000L).random())
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.