Last active
April 8, 2022 00:07
-
-
Save lgtout/eda0f59cb0d4d4f5990967349d517872 to your computer and use it in GitHub Desktop.
Experiment implementing amb/race operator using flow and without directly using channels
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@OptIn(FlowPreview::class) | |
fun <A> Flow<A>.ambUsingJobsToCancelRacersAndEmittingFailures( | |
vararg others: Flow<A> | |
): Flow<Either<Throwable, A>> = flow { | |
val parentJob = currentCoroutineContext()[Job]!! | |
(listOf(this@ambUsingJobsToCancelRacersAndEmittingFailures) + others).foldIndexed( | |
Pair(emptyList<Job>(), emptyList<Flow<Pair<Int, Either<Throwable, A>>>>()) | |
) { index, acc, curr -> | |
val job = Job(parentJob) | |
val flow = curr.map { Pair(index, it.right() as Either<Throwable, A>) } | |
.catch { emit(Pair(index, it.left() as Either<Throwable, A>)) } | |
.onCompletion { println("completion $index $it") } | |
.shareIn(CoroutineScope(job), SharingStarted.WhileSubscribed()) | |
Pair(acc.first + job, acc.second + flow) | |
}.let { (jobs, flows) -> | |
var winnerIndex: Int? = null | |
flows.asFlow() | |
.flatMapMerge(concurrency = flows.size) { it } | |
.collect { (index, item) -> | |
if (winnerIndex == null) { | |
item.fold(ifLeft = { | |
println("cancelling $index") | |
jobs[index].cancelAndJoin() | |
}, ifRight = { | |
winnerIndex = index | |
println("winnerIndex $winnerIndex") | |
jobs.forEachIndexed { i, job -> | |
if (!job.isCancelled && i != winnerIndex) { | |
println("cancelling $i") | |
job.cancelAndJoin() | |
} | |
} | |
}) | |
} | |
emit(item) | |
jobs.forEachIndexed { i, job -> | |
println("job cancelled $i ${job.isCancelled}") | |
println("job completed $i ${job.isCompleted}") | |
} | |
} | |
println("done!") | |
} | |
} | |
val e1 = Exception("Bang!") | |
val f1 = flow<Int> { throw e1 }.onStart { delay(50) } | |
val f2 = flowOf(1, 2, 3).onStart { delay(100) } | |
val f3 = flowOf(10, 20, 30).onStart { delay(150) } | |
val f4 = f1.ambUsingJobsToCancelRacersAndEmittingFailures(f2, f3) | |
f4.collect { // Suspends indefinitely, even after emitting all items in the winning flow. | |
println(it) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Output
Then
f4.collect
suspends indefinitely.