Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lgtout/eda0f59cb0d4d4f5990967349d517872 to your computer and use it in GitHub Desktop.
Save lgtout/eda0f59cb0d4d4f5990967349d517872 to your computer and use it in GitHub Desktop.
Experiment implementing amb/race operator using flow and without directly using channels
@OptIn(FlowPreview::class)
fun <A> Flow<A>.ambUsingJobsToCancelRacersAndEmittingFailures(
vararg others: Flow<A>
): Flow<Either<Throwable, A>> = flow {
val parentJob = currentCoroutineContext()[Job]!!
(listOf(this@ambUsingJobsToCancelRacersAndEmittingFailures) + others).foldIndexed(
Pair(emptyList<Job>(), emptyList<Flow<Pair<Int, Either<Throwable, A>>>>())
) { index, acc, curr ->
val job = Job(parentJob)
val flow = curr.map { Pair(index, it.right() as Either<Throwable, A>) }
.catch { emit(Pair(index, it.left() as Either<Throwable, A>)) }
.onCompletion { println("completion $index $it") }
.shareIn(CoroutineScope(job), SharingStarted.WhileSubscribed())
Pair(acc.first + job, acc.second + flow)
}.let { (jobs, flows) ->
var winnerIndex: Int? = null
flows.asFlow()
.flatMapMerge(concurrency = flows.size) { it }
.collect { (index, item) ->
if (winnerIndex == null) {
item.fold(ifLeft = {
println("cancelling $index")
jobs[index].cancelAndJoin()
}, ifRight = {
winnerIndex = index
println("winnerIndex $winnerIndex")
jobs.forEachIndexed { i, job ->
if (!job.isCancelled && i != winnerIndex) {
println("cancelling $i")
job.cancelAndJoin()
}
}
})
}
emit(item)
jobs.forEachIndexed { i, job ->
println("job cancelled $i ${job.isCancelled}")
println("job completed $i ${job.isCompleted}")
}
}
println("done!")
}
}
val e1 = Exception("Bang!")
val f1 = flow<Int> { throw e1 }.onStart { delay(50) }
val f2 = flowOf(1, 2, 3).onStart { delay(100) }
val f3 = flowOf(10, 20, 30).onStart { delay(150) }
val f4 = f1.ambUsingJobsToCancelRacersAndEmittingFailures(f2, f3)
f4.collect { // Suspends indefinitely, even after emitting all items in the winning flow.
println(it)
}
@lgtout
Copy link
Author

lgtout commented Apr 7, 2022

Output

completion 0 null
cancelling 0
Either.Left(java.lang.Exception: Bang!)
job cancelled 0 true
job completed 0 true
job cancelled 1 false
job completed 1 false
job cancelled 2 false
job completed 2 false
completion 1 null
winnerIndex 1
cancelling 2
completion 2 kotlinx.coroutines.JobCancellationException: Job was cancelled; job=JobImpl{Cancelling}@97269b4
Either.Right(1)
job cancelled 0 true
job completed 0 true
job cancelled 1 false
job completed 1 false
job cancelled 2 true
job completed 2 true
Either.Right(2)
job cancelled 0 true
job completed 0 true
job cancelled 1 false
job completed 1 false
job cancelled 2 true
job completed 2 true
Either.Right(3)
job cancelled 0 true
job completed 0 true
job cancelled 1 false
job completed 1 false
job cancelled 2 true
job completed 2 true

Then f4.collect suspends indefinitely.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment