Skip to content

Instantly share code, notes, and snippets.

@ncomet
Last active December 18, 2023 09:13
Show Gist options
  • Save ncomet/cca845400fd8ae7154292658eb427803 to your computer and use it in GitHub Desktop.
Save ncomet/cca845400fd8ae7154292658eb427803 to your computer and use it in GitHub Desktop.
LiveAsync
package org.example
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.selects.onTimeout
import kotlinx.coroutines.selects.select
suspend fun main() = coroutineScope {
val events = MutableSharedFlow<String>()
val commands = MutableSharedFlow<String>()
coroutineScope {
launch { listenToEventsDispatchToCommandsChannel(events, commands, this) }
launch { events.emit("1") }
delay(1000)
launch { commands.emit("1") }
delay(1000)
launch { events.emit("2") }
delay(1000)
launch { commands.emit("99") }
launch { commands.emit("2") }
delay(1000)
launch { events.emit("3") }
delay(1000)
launch { events.emit("4") }
launch { events.emit("5") }
delay(1000)
launch { commands.emit("5") }
delay(1000)
launch { events.emit("6") }
launch { events.emit("7") }
delay(1000)
}
}
@OptIn(ExperimentalCoroutinesApi::class)
suspend fun listenToEventsDispatchToCommandsChannel(
events: MutableSharedFlow<String>,
commands: MutableSharedFlow<String>,
parentScope: CoroutineScope
) {
events.collect {
println("Received Event: $it")
parentScope.launch {
val channel = Channel<String>()
val forwardJob = launch { forwardFlowToChannel(commands, channel) }
val waitJob = launch { waitForCommand(channel, it) }
select {
waitJob.onJoin {
println("Received Command: $it")
forwardJob.cancel()
}
onTimeout(5000) {
println("Timeout, did not receive Command for Event: $it")
forwardJob.cancel()
waitJob.cancel()
}
}
channel.close()
}
}
}
suspend fun forwardFlowToChannel(flow: MutableSharedFlow<String>, channel: Channel<String>) {
flow.collect {
channel.send(it)
}
}
suspend fun waitForCommand(commandsChannel: ReceiveChannel<String>, expectedCommand: String) {
for (command in commandsChannel) {
if (command == expectedCommand) {
break
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment