Skip to content

Instantly share code, notes, and snippets.

@svanellewee
Created February 22, 2022 10:45
Show Gist options
  • Save svanellewee/a290de1edbddfaba431400d052613347 to your computer and use it in GitHub Desktop.
Save svanellewee/a290de1edbddfaba431400d052613347 to your computer and use it in GitHub Desktop.
Golang style pipelines/merge ops in Kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlin.random.Random
import kotlin.random.nextUInt
import kotlin.time.ExperimentalTime
import kotlin.time.measureTime
data class TestMessage(val id: Int, val startTime: Long, val data: UInt, val streamName: String, val newData: UInt = 0u)
fun CoroutineScope.merge(vararg channels: ReceiveChannel<TestMessage>) : Channel<TestMessage> {
val outgoing = Channel<TestMessage>()
val jobs = channels.map { channel ->
launch {
for ( message in channel) {
outgoing.send(message)
}
}
}
launch {
jobs.joinAll()
println("done merging")
outgoing.close()
}
return outgoing
}
fun CoroutineScope.incomingMessages(maxMessages: Int = 40_000, streamName: String, filter: (UInt) -> Boolean): Channel<TestMessage> {
val output = Channel<TestMessage>()
launch(Dispatchers.Default) {
for (i in 0..maxMessages) {
if (filter(i.toUInt())) output.send(TestMessage(i, System.currentTimeMillis(), i.toUInt(), streamName))
}
output.close()
}
return output
}
data class OutMessage(val workerId: Int, val message: TestMessage, val threadName: String)
fun CoroutineScope.workerPool(numberOfWorkers: Int, incoming: ReceiveChannel<TestMessage>, work: suspend (TestMessage) -> TestMessage): Channel<OutMessage> {
val outgoing = Channel<OutMessage>()
val jobs = (1..numberOfWorkers).map{ workerId ->
launch(Dispatchers.Default) {
for(message in incoming) {
outgoing.send(OutMessage(workerId, work(message), Thread.currentThread().name))
}
}
}
launch {
jobs.joinAll()
println("closing channel")
outgoing.close()
}
return outgoing
}
@OptIn(ExperimentalTime::class)
fun main(): Unit = runBlocking {
val answer = measureTime {
val evenValues = incomingMessages(1000, "even") {
(it % 2u) == 0u
}
val oddValues = incomingMessages(1000, "odd") {
(it % 2u) != 0u
}
val messages = merge(evenValues, oddValues)
val outgoing = workerPool(500, messages) { message ->
//delay(1000)
message.copy(newData = (message.data * 2u + 100u))
}
val j = launch(Dispatchers.Default) {
for (msg in outgoing) {
val currentTime = System.currentTimeMillis()
println("result: $msg, took ${currentTime - msg.message.startTime}")
}
println("end of pappa koroutine")
}
j.join()
println("Donezies")
}
println("took = $answer")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment