Last active
October 13, 2017 05:15
-
-
Save detouched/73f366cb730ece629b9141dea96d3a98 to your computer and use it in GitHub Desktop.
Kotlin Coroutines: select first _successful_ clause
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Waits for the result of multiple suspending functions simultaneously. The caller is suspended until | |
* one of the clauses is _selected_ or all clauses _fail_. | |
* | |
* @param deferredList a list of [clauses][Deferred] to select a result from | |
* @param cancelOtherJobsWhenDone a flag whether all [Deferred] from the provided [deferredList] which are | |
* still active at the moment of getting the first result should be cancelled | |
* @return a [Pair] of a result of the _selected_ clause, if present, and a [List] of the _failed_ clauses | |
* gathered at the moment of getting that result. If all clauses _fail_, the result - a [Pair] of `null` | |
* and a list of all clauses, - will be returned after the last clause fails. | |
*/ | |
suspend fun <R, D : Deferred<R>> selectFirstSuccessful(deferredList: List<D>, | |
cancelOtherJobsWhenDone: Boolean = true): Pair<R?, List<D>> = | |
selectFirstSuccessful(deferredList, listOf()) | |
.let { (result, otherDeferred, failedDeferred) -> | |
if (cancelOtherJobsWhenDone) otherDeferred.forEach { it.cancel() } | |
result to failedDeferred | |
} | |
/** | |
* Waits for the result of multiple suspending functions simultaneously. The caller is suspended until | |
* one of the clauses is _selected_ or all clauses _fail_. | |
* | |
* @param deferredList a list of [clauses][Deferred] to select a result from | |
* | |
* @return a [Triple] of a result of the _selected_ clause, if present, a [List] of the clauses that were | |
* active at the moment of getting that result, and a [List] of the _failed_ clauses gathered at the moment of | |
* getting that result. If all clauses _fail_, the result - a [Triple] of `null`, empty list and a list | |
* of all clauses, - will be returned after the last clause fails. | |
*/ | |
suspend fun <R, D : Deferred<R>> selectFirstSuccessfulAndOther(deferredList: List<D>): Triple<R?, List<D>, List<D>> = | |
selectFirstSuccessful(deferredList, listOf()) | |
private suspend fun <R, D : Deferred<R>> selectFirstSuccessful(deferredList: List<D>, | |
failedDeferredList: List<D>): Triple<R?, List<D>, List<D>> = | |
if (deferredList.isEmpty()) { | |
Triple(null, deferredList, failedDeferredList) | |
} else { | |
try { | |
select<Triple<R?, List<D>, List<D>>> { | |
deferredList.forEach { deferred -> | |
deferred.onAwait { result -> Triple(result, deferredList - deferred, failedDeferredList) } | |
} | |
} | |
} catch (e: Exception) { | |
val completedExceptionally = deferredList.filter { it.isCompletedExceptionally } | |
val notCompletedExceptionally = deferredList - completedExceptionally | |
selectFirstSuccessful(notCompletedExceptionally, completedExceptionally + failedDeferredList) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
fun asyncStringOrException(random: Random) = async { | |
val time = random.nextInt(1000) | |
delay(time.toLong()) | |
if (random.nextInt(100) > 90) "Waited for $time ms" | |
else throw RuntimeException("I'm tired after $time ms") | |
} | |
fun generateList() = Random(1).let { random -> List(10) { asyncStringOrException(random) } } | |
fun testFirstSuccessful() = runBlocking { | |
val (result, failedResults) = selectFirstSuccessful(generateList()) | |
println("Result: $result") | |
println("${failedResults.size} coroutines have failed at the time of getting the result") | |
failedResults.forEach { println(" Fail reason: ${it.getCompletionExceptionOrNull()}") } | |
} | |
fun testFirstSuccessfulAndOther() = runBlocking { | |
val (result, otherCoroutines, failedResults) = selectFirstSuccessfulAndOther(generateList()) | |
println("Result: $result") | |
println("${otherCoroutines.size} coroutines still running at the time of getting the result") | |
println("${failedResults.size} coroutines have failed at the time of getting the result") | |
} | |
fun main(args: Array<String>) { | |
testFirstSuccessful() | |
println() | |
testFirstSuccessfulAndOther() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The
Random(1)
is to make the the following result reproducible, but it can obviously be changed to have any seed: