Created
March 24, 2022 18:12
-
-
Save hoc081098/b7dcb65257a99ca933e74542186bf7c8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import kotlinx.coroutines.ExperimentalCoroutinesApi | |
import kotlinx.coroutines.channels.ChannelResult | |
import kotlinx.coroutines.channels.onFailure | |
import kotlinx.coroutines.channels.onSuccess | |
import kotlinx.coroutines.channels.produce | |
import kotlinx.coroutines.coroutineScope | |
import kotlinx.coroutines.flow.Flow | |
import kotlinx.coroutines.flow.emitAll | |
import kotlinx.coroutines.flow.flow | |
import kotlinx.coroutines.selects.select | |
import kotlinx.coroutines.yield | |
@ExperimentalCoroutinesApi | |
public fun <T> race(flows: Iterable<Flow<T>>): Flow<T> = flow { | |
coroutineScope { | |
// 1. Collect to all source Flows | |
val channels = flows.map { flow -> | |
// Produce the values using the default (rendezvous) channel | |
produce { | |
flow.collect { | |
send(it) | |
yield() // Emulate fairness, giving each flow chance to emit | |
} | |
} | |
} | |
// If channels List is empty, just return and complete result Flow. | |
if (channels.isEmpty()) { | |
return@coroutineScope | |
} | |
// If channels List has single element, just forward all events from it. | |
channels | |
.singleOrNull() | |
?.let { return@coroutineScope emitAll(it) } | |
// 2. When a new event arrives from a source Flow, pass it down to a collector. | |
// Select expression makes it possible to await multiple suspending functions simultaneously | |
// and select the first one that becomes available. | |
val (winnerIndex, winnerResult) = select<Pair<Int, ChannelResult<T>>> { | |
channels.forEachIndexed { index, channel -> | |
channel.onReceiveCatching { | |
index to it | |
} | |
} | |
} | |
// 3. Cancel all other Flows. | |
channels.forEachIndexed { index, channel -> | |
if (index != winnerIndex) { | |
channel.cancel() | |
} | |
} | |
// 4. Forward all events from the winner Flow . | |
winnerResult | |
.onSuccess { | |
emit(it) | |
emitAll(channels[winnerIndex]) | |
} | |
.onFailure { | |
it?.let { throw it } | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment