Skip to content

Instantly share code, notes, and snippets.

@twyatt
Created August 22, 2019 05:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save twyatt/d23804978272faa5dda009dabcd0e08f to your computer and use it in GitHub Desktop.
Save twyatt/d23804978272faa5dda009dabcd0e08f to your computer and use it in GitHub Desktop.
Kotlin Coroutine Experiment: 2 Flows, 1 Channel
pollChars
← CharData(value=a)
← CharData(value=b)
pollNumbers
← NumberData(value=-1)
← NumberData(value=0)
← NumberData(value=1)
pollChars
← CharData(value=c)
pollNumbers
← NumberData(value=-1)
← NumberData(value=3)
← NumberData(value=4)
← NumberData(value=5)
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.flow.*
import kotlin.coroutines.CoroutineContext
suspend fun main() {
val merger = Merger()
GlobalScope.launch {
delay(10_000L)
merger.pollNumbers()
delay(15_000L)
merger.pollChars()
delay(2_000L)
merger.pollNumbers()
}
runBlocking {
merger.channel.consumeEach {
println("← $it")
}
}
}
sealed class Data {
data class NumberData(val value: Int) : Data()
data class CharData(val value: Char) : Data()
}
class Merger : CoroutineScope {
private val job = Job()
override val coroutineContext: CoroutineContext = job
private val api = Api()
val channel = BroadcastChannel<Data>(CONFLATED)
init {
pollChars()
}
fun pollNumbers() {
job.cancelChildren()
api.pollNumber()
.retry()
.onStart {
println("pollNumbers")
emit(api.fetchTheNumberNegativeOne())
}
.map { Data.NumberData(it) }
.onEach { channel.send(it) }
.launchIn(this)
}
fun pollChars() {
job.cancelChildren()
api.pollChar()
.onStart {
println("pollChars")
}
.retry()
.map { Data.CharData(it) }
.onEach { channel.send(it) }
.launchIn(this)
}
}
class Api {
private var i = 0
suspend fun fetchNumber(): Int {
delay(1_000L) // simulate network request
val value = i++
if (value == 2) {
error("was $value")
}
return value
}
suspend fun fetchTheNumberNegativeOne(): Int {
delay(500L) // simulate network request
return -1
}
private var c = 'a'
suspend fun fetchChar(): Char {
delay(1_500L) // simulate network request
val value = c
if (c++ > 'z') c = 'a'
return value
}
}
fun Api.pollNumber() = flow {
while (true) {
val value = fetchNumber()
emit(value)
delay(10_000L)
}
}
fun Api.pollChar() = flow {
while (true) {
val value = fetchChar()
emit(value)
delay(4_000L)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment