Skip to content

Instantly share code, notes, and snippets.

@geomagilles
Last active December 15, 2020 11:19
Show Gist options
  • Save geomagilles/141306a13ebe44053bdb307b3de59aa7 to your computer and use it in GitHub Desktop.
Save geomagilles/141306a13ebe44053bdb307b3de59aa7 to your computer and use it in GitHub Desktop.
Building Workers With Coroutines
typealias TaskEngineMessageToProcess = MessageToProcess<TaskEngineMessage>
fun CoroutineScope.startPulsarTaskEngineWorker(
taskEngineConsumer: Consumer<TaskEngineEnvelope>,
taskEngine: TaskEngine,
logChannel: SendChannel<TaskEngineMessageToProcess>?,
enginesNumber: Int
) = launch(Dispatchers.IO) {
val taskInputChannel = Channel<TaskEngineMessageToProcess>()
val taskResultsChannel = Channel<TaskEngineMessageToProcess>()
// coroutine dedicated to pulsar message pulling
launch(CoroutineName("task-engine-message-puller")) {
while (isActive) {
val message: Message<TaskEngineEnvelope> = taskEngineConsumer.receiveAsync().await()
try {
val envelope = readBinary(message.data, TaskEngineEnvelope.serializer())
taskInputChannel.send(MessageToProcess(envelope.message(), message.messageId))
} catch (e: Exception) {
taskEngineConsumer.negativeAcknowledge(message.messageId)
throw e
}
}
}
// coroutines dedicated to Task Engine
repeat(enginesNumber) {
launch(CoroutineName("task-engine-$it")) {
for (messageToProcess in taskInputChannel) {
try {
messageToProcess.output = taskEngine.handle(messageToProcess.message)
} catch (e: Exception) {
messageToProcess.exception = e
}
taskResultsChannel.send(messageToProcess)
}
}
}
// coroutine dedicated to pulsar message acknowledging
launch(CoroutineName("task-engine-message-acknowledger")) {
for (messageToProcess in taskResultsChannel) {
if (messageToProcess.exception == null) {
taskEngineConsumer.acknowledgeAsync(messageToProcess.messageId).await()
} else {
taskEngineConsumer.negativeAcknowledge(messageToProcess.messageId)
}
logChannel?.send(messageToProcess)
}
}
}
data class MessageToProcess<T> (
val message: T,
val messageId: MessageId,
var exception: Exception? = null,
var output: Any? = null
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment