Skip to content

Instantly share code, notes, and snippets.

@lpar lpar/KotlinPipeline.md
Last active Jul 8, 2019

Embed
What would you like to do?
Example of a processing pipeline in Kotlin — once with threads and BlockingQueue, once with channels and suspend functions
package coroutines
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
// This is an example of a three-stage multithreaded processing pipeline in Kotlin,
// with blocking operations occurring in all three stages of the pipeline, implemented
// using Kotlin `Channel` and coroutine objects.
//
// source -> filter -> output
//
// Thread.sleep is used to simulate a blocking IO operation.
suspend fun source(cout: Channel<Int>) {
println("Source starting")
for (i in 1..10) {
val x = (0..255).random()
cout.send(x)
println("Source iteration $i sent $x")
withContext(Dispatchers.IO) {
val sleep: Long = (400..600).random().toLong()
Thread.sleep(sleep)
}
}
cout.close()
println("Source exiting")
}
suspend fun filter(cin: Channel<Int>, cout: Channel<String>) {
println(" Filter starting")
for (x in cin) {
println(" Filter received $x")
withContext(Dispatchers.IO) {
val sleep: Long = (400..600).random().toLong()
Thread.sleep(sleep)
}
val y = "'$x'"
cout.send(y)
println(" Filter sent $y")
}
cout.close()
println(" Filter exiting")
}
suspend fun output(cin: Channel<String>) {
println(" Output starting")
for (x in cin) {
println(" Output received $x")
withContext(Dispatchers.IO) {
val sleep: Long = (400..600).random().toLong()
Thread.sleep(sleep)
}
}
println(" Output exiting")
}
const val queueSize = 2
fun runAll() {
runBlocking {
println("runAll starting")
val pipe1 = Channel<Int>(queueSize)
val pipe2 = Channel<String>(queueSize)
GlobalScope.launch {
launch { source(pipe1) }
launch { filter(pipe1, pipe2) }
launch { output(pipe2) }
}.join()
}
println("runAll exiting")
}
fun main() {
println("main starting")
runAll()
println("main exiting")
}

Kotlin pipeline example

Example of a processing pipeline in Kotlin, implemented two different ways:

  1. Using JVM threads and BlockingQueue (package threads)
  2. Using Kotlin coroutines and channels (package coroutines)

The pipeline in both cases goes Source -> Filter -> Output, with all three stages containing blocking code.

package coroutines
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
// This is an example of a three-stage multithreaded processing pipeline in Kotlin,
// with blocking operations occurring in all three stages of the pipeline, implemented
// using Kotlin `Channel` and coroutine objects.
//
// source -> filter -> output
//
// Thread.sleep is used to simulate a blocking IO operation.
suspend fun source(cout: Channel<Int>) {
println("Source starting")
for (i in 1..10) {
val x = (0..255).random()
cout.send(x)
println("Source iteration $i sent $x")
withContext(Dispatchers.IO) {
val sleep: Long = (400..600).random().toLong()
Thread.sleep(sleep)
}
}
cout.close()
println("Source exiting")
}
suspend fun filter(cin: Channel<Int>, cout: Channel<String>) {
println(" Filter starting")
for (x in cin) {
println(" Filter received $x")
withContext(Dispatchers.IO) {
val sleep: Long = (400..600).random().toLong()
Thread.sleep(sleep)
}
val y = "'$x'"
cout.send(y)
println(" Filter sent $y")
}
cout.close()
println(" Filter exiting")
}
suspend fun output(cin: Channel<String>) {
println(" Output starting")
for (x in cin) {
println(" Output received $x")
withContext(Dispatchers.IO) {
val sleep: Long = (400..600).random().toLong()
Thread.sleep(sleep)
}
}
println(" Output exiting")
}
const val queueSize = 2
fun runAll() {
runBlocking {
println("runAll starting")
val pipe1 = Channel<Int>(queueSize)
val pipe2 = Channel<String>(queueSize)
GlobalScope.launch {
launch { source(pipe1) }
launch { filter(pipe1, pipe2) }
launch { output(pipe2) }
}.join()
}
println("runAll exiting")
}
fun main() {
println("main starting")
runAll()
println("main exiting")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.