Skip to content

Instantly share code, notes, and snippets.

@phiSgr
Created December 21, 2021 14:09
Show Gist options
  • Save phiSgr/ced066dfe7852de847de130a32068cb9 to your computer and use it in GitHub Desktop.
Save phiSgr/ced066dfe7852de847de130a32068cb9 to your computer and use it in GitHub Desktop.
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.selects.select
data class AllFailed(val failures: List<Throwable>) : Throwable()
suspend fun <T : Any> requestHedging(
providers: List<suspend () -> T>,
duration: Long,
close: suspend (T) -> Unit = {}
): T = supervisorScope {
val result = CompletableDeferred<T>()
val failureSignal = Channel<Throwable>()
fun start(i: Int) {
launch {
val t = try {
providers[i]()
} catch (e: Throwable) {
failureSignal.send(e)
throw e
}
val won = result.complete(t)
if (!won) {
withContext(NonCancellable) { close(t) }
}
}
}
start(0)
var next = 1
var res: T? = null
val failures = mutableListOf<Throwable>()
while (res == null) {
val allStarted = next == providers.size
val thisLoopResult = select<T?> {
result.onAwait { it }
if (!allStarted) {
onTimeout(duration) { null }
}
failureSignal.onReceive {
failures.add(it)
if (failures.size == providers.size) {
throw AllFailed(failures)
}
null
}
}
if (thisLoopResult == null) {
if (!allStarted) {
start(next)
next += 1
}
} else {
coroutineContext.cancelChildren()
res = thisLoopResult
}
}
res
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment