Skip to content

Instantly share code, notes, and snippets.

@ncomet
Created December 15, 2023 14:13
Show Gist options
  • Save ncomet/1ad66b4e4261bc000b1666ce2b0288b4 to your computer and use it in GitHub Desktop.
Save ncomet/1ad66b4e4261bc000b1666ce2b0288b4 to your computer and use it in GitHub Desktop.
Experiments with coroutines and flows
package com.betclic.poker.gameserver.domain
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.selects.onTimeout
import kotlinx.coroutines.selects.select
sealed interface Outcome {
data class Ok(val result: String) : Outcome
data object Timeout : Outcome
}
suspend fun main() {
withJobs()
// withFlows() // spoiler: does not work
withChannels()
withChannelAndTimeout()
flowToChannelAndTimeout()
}
suspend fun withFlows() {
val flow = MutableSharedFlow<Outcome>()
CoroutineScope(Job()).launch {
flow.collect { outcome ->
when (outcome) {
is Outcome.Ok -> println(outcome.result)
is Outcome.Timeout -> println("Timeout")
}
}
}
coroutineScope {
launch {
delay(3000)
flow.emit(Outcome.Timeout)
}
launch {
delay(10000)
flow.emit(Outcome.Ok("Yay!"))
}
}
}
suspend fun withChannels() {
val channel = Channel<Outcome>()
coroutineScope {
val timeoutJob = launch {
delay(3000)
channel.send(Outcome.Timeout)
}
val workJob = launch {
delay(10000)
channel.send(Outcome.Ok("Yay!"))
}
select {
channel.onReceive { outcome ->
when (outcome) {
is Outcome.Ok -> {
println(outcome.result)
timeoutJob.cancel()
}
is Outcome.Timeout -> {
println("Timeout")
workJob.cancel()
}
}
}
}
}
}
suspend fun withJobs() =
coroutineScope {
val timeout = async {
delay(3000)
Outcome.Timeout
}
val work = async {
delay(10000)
Outcome.Ok("Yay!")
}
val parallelJobs = listOf(timeout, work)
select {
parallelJobs.forEach {
it.onAwait { outcome ->
when (outcome) {
is Outcome.Ok -> {
println(outcome.result)
timeout.cancel()
}
is Outcome.Timeout -> {
println("Timeout")
work.cancel()
}
}
}
}
}
}
@OptIn(ExperimentalCoroutinesApi::class)
suspend fun withChannelAndTimeout() {
val channel = Channel<String>()
coroutineScope {
val job = launch {
delay(10000)
channel.send("Yay!")
}
select {
channel.onReceive { outcome ->
println(outcome)
}
onTimeout(3000) {
println("Timeout")
job.cancel()
}
}
}
}
@OptIn(ExperimentalCoroutinesApi::class)
suspend fun flowToChannelAndTimeout() {
val flow = MutableSharedFlow<String>()
val channel = Channel<String>()
CoroutineScope(Job()).launch {
flow.collect {
channel.send(it)
}
}
coroutineScope {
val job = launch {
delay(10000)
flow.emit("Yay!")
}
select {
channel.onReceive { outcome ->
println(outcome)
}
onTimeout(3000) {
println("Timeout")
job.cancel()
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment