Skip to content

Instantly share code, notes, and snippets.

@hoc081098
Created March 24, 2022 18:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hoc081098/b7dcb65257a99ca933e74542186bf7c8 to your computer and use it in GitHub Desktop.
Save hoc081098/b7dcb65257a99ca933e74542186bf7c8 to your computer and use it in GitHub Desktop.
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.ChannelResult
import kotlinx.coroutines.channels.onFailure
import kotlinx.coroutines.channels.onSuccess
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.selects.select
import kotlinx.coroutines.yield
@ExperimentalCoroutinesApi
public fun <T> race(flows: Iterable<Flow<T>>): Flow<T> = flow {
coroutineScope {
// 1. Collect to all source Flows
val channels = flows.map { flow ->
// Produce the values using the default (rendezvous) channel
produce {
flow.collect {
send(it)
yield() // Emulate fairness, giving each flow chance to emit
}
}
}
// If channels List is empty, just return and complete result Flow.
if (channels.isEmpty()) {
return@coroutineScope
}
// If channels List has single element, just forward all events from it.
channels
.singleOrNull()
?.let { return@coroutineScope emitAll(it) }
// 2. When a new event arrives from a source Flow, pass it down to a collector.
// Select expression makes it possible to await multiple suspending functions simultaneously
// and select the first one that becomes available.
val (winnerIndex, winnerResult) = select<Pair<Int, ChannelResult<T>>> {
channels.forEachIndexed { index, channel ->
channel.onReceiveCatching {
index to it
}
}
}
// 3. Cancel all other Flows.
channels.forEachIndexed { index, channel ->
if (index != winnerIndex) {
channel.cancel()
}
}
// 4. Forward all events from the winner Flow .
winnerResult
.onSuccess {
emit(it)
emitAll(channels[winnerIndex])
}
.onFailure {
it?.let { throw it }
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment