Skip to content

Instantly share code, notes, and snippets.

@griffio
Last active April 3, 2023 10:46
Show Gist options
  • Save griffio/073e5f440971e7e19dbc3e2011c9ec07 to your computer and use it in GitHub Desktop.
Save griffio/073e5f440971e7e19dbc3e2011c9ec07 to your computer and use it in GitHub Desktop.
HappyEyeBalls - does it work?
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.*
import kotlin.random.Random
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds
suspend fun task1(): String {
println("<start>.task1")
//error("fail: task1")
delay(10.seconds)
println("<finish>.task1")
return "success: task1"
}
suspend fun task2(): String {
println("<start>.task2")
delay(1.seconds)
error("fail: task2")
return "success: task2"
}
suspend fun task3(): String {
println("<start>.task3")
//error("fail: task3")
delay(3.seconds)
println("<finish>.task3")
return "success: task3"
}
suspend fun task4(): String {
println("<start>.task4")
// error("fail: task4")
delay(2.seconds)
println("<finish>.task4")
return "success: task4"
}
suspend fun task5(): String {
println("<start>.task5")
delay(2.seconds)
println("<finish>.task5")
return "success: task5"
}
// wait for delay or error to be signaled first
suspend fun delayOrFail(channel: Channel<Unit>, delayBy: Duration) = channelFlow {
launch { send(delayTask(delayBy)) }
launch { send(failedTask(channel)) }
}.first()
//Another way using select
@OptIn(ExperimentalCoroutinesApi::class)
suspend fun delayOrFailAlt(failedTask: Channel<Unit>, delayBy: Duration) {
select {
failedTask.onReceive {}
onTimeout(delayBy) {}
}
}
suspend fun <T> failedTask(failed: Channel<T>) {
failed.receive()
}
suspend fun delayTask(delayBy: Duration) {
delay(delayBy)
}
suspend fun <T> happyEyeBalls(tasks: List<suspend () -> T>, delayedBy: Duration): Flow<T> = channelFlow {
val failedTask = Channel<Unit>(1)
// start the first task immediately
launch {
try {
send(tasks.first()())
} catch (e: Exception) {
failedTask.trySend(Unit) // it failed, send message to channel
}
}
// start remaining tasks only after either waiting for delay or failed channel element is received
tasks.drop(1).forEach { task ->
delayOrFail(failedTask, delayedBy) // wait for delay or failed signal to continue next task
launch {
try {
send(task())
} catch (e: Exception) {
failedTask.trySend(Unit) // task failed, send message to channel - next task will start
}
}
}
}
suspend fun main(): Unit = coroutineScope {
val tasks = listOf(::task1, ::task2, ::task3, ::task4, ::task5)
// the first task to succeed - the others are cancelled
val winner = happyEyeBalls(tasks, 2.seconds).first()
println(winner) // task3 is quickest - task5 doesn't start
}
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.*
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
suspend fun task1(): String {
println("<start>.task1")
//error("fail: task1")
delay(10.seconds)
println("<finish>.task1")
return "success: task1"
}
suspend fun task2(): String {
println("<start>.task2")
delay(1.seconds)
error("fail: task2")
return "success: task2"
}
suspend fun task3(): String {
println("<start>.task3")
//error("fail: task3")
delay(3.seconds)
println("<finish>.task3")
return "success: task3"
}
suspend fun task4(): String {
println("<start>.task4")
// error("fail: task4")
delay(2.seconds)
println("<finish>.task4")
return "success: task4"
}
suspend fun task5(): String {
println("<start>.task5")
delay(2.seconds)
println("<finish>.task5")
return "success: task5"
}
@OptIn(FlowPreview::class)
suspend fun main() = coroutineScope {
val tasks = listOf(
::task1.asFlow(),
::task2.asFlow(),
::task3.asFlow(),
::task4.asFlow(),
::task5.asFlow(),
)
val first = happyEyeBalls(tasks, 2.seconds).first()
println(first) // task3 is quickest - task5 doesn't start
}
@OptIn(FlowPreview::class)
suspend fun happyEyeBalls(tasks: List<Flow<String>>, delayedBy: Duration): Flow<String> {
val failedTask = Channel<Unit>(1)
return when (tasks.size) {
0 -> error("no tasks")
1 -> tasks.first().catch { failedTask.trySend(Unit) }
else -> {
merge(
tasks.first().catch { failedTask.trySend(Unit) },
flow { emit(delayOrFail(failedTask, delayedBy)) }.flatMapMerge {
happyEyeBalls(tasks.drop(1), delayedBy) // delayOrFail flatmap with recursive tasks
}
)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment