Skip to content

Instantly share code, notes, and snippets.

@gaerfield
Last active February 26, 2020 10:20
Show Gist options
  • Save gaerfield/291d6a5e9441a90946fac909ff0357c1 to your computer and use it in GitHub Desktop.
Save gaerfield/291d6a5e9441a90946fac909ff0357c1 to your computer and use it in GitHub Desktop.
Comparison of kotlins Flow vs. Channel-Approach
package de.kramhal.coffeebutts
import de.kramhal.coffeebutts.Consumer.*
import de.kramhal.coffeebutts.FlowOperator.*
import de.kramhal.coffeebutts.Producer.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlin.system.measureTimeMillis
/**
* This example takes a look into two different approaches for typical problems with asynchronous proccessing of stream-data in kotlin:
* - asynchronous processing using channels
* - asynchronous processing using flows
*
* Problem #1 - Reading of data is slow on producer-side, because:
* a) either it is read from a single source (like HDD)
* b) or it is read from multiple slow sources (like different rest-endpoints)
* Problem #2 - given we have only one consumer available, consuming data is:
* a) either slow
* b) or fast
*
* Solving (1b) can be solved by using multiple coroutines, while with (1a) this approach is not possible.
* If only one slow consumer is available (2a), than the only option is parallelize reading and consuming,
* so they aren't blocking each other.
*
* The code is only meant to compare channels and flows when trying to solve these problems and should answer.
* How to using channels solving concurrent-producing?
* How to provide a flow-operator allowing to split reading and consuming concurrently?
* How could a flow be constructed for concurrent-producing?
*
* The printout is meant for validating the expectations. The demo produces every time the numbers from 1 to 10
* with a delay of 100ms. The consumer just returns this number, in case of slow with a delay of 100ms. So the
* expectation would be:
* - sequential producing + fast consuming = about 1000ms
* - sequential producing + slow consuming = about 2000ms
* - sequential producing + parallel slow consuming = 1100ms
* - concurrently producing + slow consuming = 1100ms
* - concurrently producing + fast consuming = 100ms
*/
/** only for simulating long execution time */
suspend fun longRunningTask(i: Int): Int { delay(100); return i }
enum class Producer {
/** Data is emitted sequentially (like reading entries from an sql-table). */
sequential,
/** Data is emitted concurrently (like different web-requests where we wait for an answer). */
concurrent
}
enum class Consumer {
/** Processing Data without extra costs. */
fast,
/** Processing Data involves expensive calculations. */
slow
}
enum class FlowOperator {
/** data is processed sequentially by the flow */
none,
/** producing and processing data is done concurrently by using two different flows */
buffered
}
class Channels(
private val consumer: Consumer = slow
) {
suspend fun executionTime() = measureTimeMillis {
coroutineScope {
val channel = Channel<Int>()
val all = (1..10).map { async { channel.send(longRunningTask(it)) } }
launch { all.awaitAll(); channel.close() }
val flow = flow {
channel.consumeEach {
emit( if(consumer == fast) it else longRunningTask(it) )
}
}
launch { flow.collect { println(it) } }
}
}
}
class Flows(
private val producer: Producer = sequential,
private val consumer: Consumer = slow,
private val flowOperator: FlowOperator = none
){
private suspend fun producer() = when(producer) {
sequential -> flow { (1..10).forEach { emit(longRunningTask(it)) } }
concurrent -> coroutineScope { (1..10).map { async { longRunningTask(it) } }.awaitAll().asFlow() }
}
private suspend fun consumer(ints: Flow<Int>) = when(consumer) {
fast -> ints.collect { println(longRunningTask(it)) }
slow -> ints.collect { println(it) }
}
private fun <T> Flow<T>.buffer(size: Int = 0): Flow<T> = flow {
coroutineScope {
val channel = produce(capacity = size) {
collect { send(it) }
}
channel.consumeEach { emit(it) }
}
}
suspend fun executionTime() = measureTimeMillis {
when(flowOperator) {
none -> consumer(producer())
buffered -> consumer(producer().buffer())
}
}
}
fun main() {
println(
"""
Channels with concurrent slow producer:
- slow Consumer: ${runBlocking { Channels(consumer = slow).executionTime() }}
- fast Consumer: ${runBlocking { Channels(consumer = fast).executionTime() }}
Flows with sequential slow producer:
- fast Consumer: ${runBlocking { Flows(producer = sequential, consumer = fast, flowOperator = none).executionTime() }}
- slow Consumer: ${runBlocking { Flows(producer = sequential, consumer = slow, flowOperator = none).executionTime() }}
- operator and slow Consumer: ${runBlocking { Flows(producer = sequential, consumer = fast, flowOperator = buffered).executionTime() }}
- operator and fast Consumer: ${runBlocking { Flows(producer = sequential, consumer = slow, flowOperator = buffered).executionTime() }}
Flows with concurrent slow producer:
- slow Consumer: ${runBlocking { Flows(producer = concurrent, consumer = fast, flowOperator = none).executionTime() }}
- fast Consumer: ${runBlocking { Flows(producer = concurrent, consumer = slow, flowOperator = none).executionTime() }}
- operator and slow Consumer: ${runBlocking { Flows(producer = concurrent, consumer = fast, flowOperator = buffered).executionTime() }}
- operator and fast Consumer: ${runBlocking { Flows(producer = concurrent, consumer = slow, flowOperator = buffered).executionTime() }}
""".trimIndent()
)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment