Last active
April 3, 2023 10:46
-
-
Save griffio/073e5f440971e7e19dbc3e2011c9ec07 to your computer and use it in GitHub Desktop.
HappyEyeBalls - does it work?
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import kotlinx.coroutines.* | |
import kotlinx.coroutines.channels.Channel | |
import kotlinx.coroutines.flow.* | |
import kotlin.random.Random | |
import kotlin.time.Duration | |
import kotlin.time.Duration.Companion.milliseconds | |
import kotlin.time.Duration.Companion.seconds | |
suspend fun task1(): String { | |
println("<start>.task1") | |
//error("fail: task1") | |
delay(10.seconds) | |
println("<finish>.task1") | |
return "success: task1" | |
} | |
suspend fun task2(): String { | |
println("<start>.task2") | |
delay(1.seconds) | |
error("fail: task2") | |
return "success: task2" | |
} | |
suspend fun task3(): String { | |
println("<start>.task3") | |
//error("fail: task3") | |
delay(3.seconds) | |
println("<finish>.task3") | |
return "success: task3" | |
} | |
suspend fun task4(): String { | |
println("<start>.task4") | |
// error("fail: task4") | |
delay(2.seconds) | |
println("<finish>.task4") | |
return "success: task4" | |
} | |
suspend fun task5(): String { | |
println("<start>.task5") | |
delay(2.seconds) | |
println("<finish>.task5") | |
return "success: task5" | |
} | |
// wait for delay or error to be signaled first | |
suspend fun delayOrFail(channel: Channel<Unit>, delayBy: Duration) = channelFlow { | |
launch { send(delayTask(delayBy)) } | |
launch { send(failedTask(channel)) } | |
}.first() | |
//Another way using select | |
@OptIn(ExperimentalCoroutinesApi::class) | |
suspend fun delayOrFailAlt(failedTask: Channel<Unit>, delayBy: Duration) { | |
select { | |
failedTask.onReceive {} | |
onTimeout(delayBy) {} | |
} | |
} | |
suspend fun <T> failedTask(failed: Channel<T>) { | |
failed.receive() | |
} | |
suspend fun delayTask(delayBy: Duration) { | |
delay(delayBy) | |
} | |
suspend fun <T> happyEyeBalls(tasks: List<suspend () -> T>, delayedBy: Duration): Flow<T> = channelFlow { | |
val failedTask = Channel<Unit>(1) | |
// start the first task immediately | |
launch { | |
try { | |
send(tasks.first()()) | |
} catch (e: Exception) { | |
failedTask.trySend(Unit) // it failed, send message to channel | |
} | |
} | |
// start remaining tasks only after either waiting for delay or failed channel element is received | |
tasks.drop(1).forEach { task -> | |
delayOrFail(failedTask, delayedBy) // wait for delay or failed signal to continue next task | |
launch { | |
try { | |
send(task()) | |
} catch (e: Exception) { | |
failedTask.trySend(Unit) // task failed, send message to channel - next task will start | |
} | |
} | |
} | |
} | |
suspend fun main(): Unit = coroutineScope { | |
val tasks = listOf(::task1, ::task2, ::task3, ::task4, ::task5) | |
// the first task to succeed - the others are cancelled | |
val winner = happyEyeBalls(tasks, 2.seconds).first() | |
println(winner) // task3 is quickest - task5 doesn't start | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import kotlinx.coroutines.* | |
import kotlinx.coroutines.channels.Channel | |
import kotlinx.coroutines.flow.* | |
import kotlin.time.Duration | |
import kotlin.time.Duration.Companion.seconds | |
suspend fun task1(): String { | |
println("<start>.task1") | |
//error("fail: task1") | |
delay(10.seconds) | |
println("<finish>.task1") | |
return "success: task1" | |
} | |
suspend fun task2(): String { | |
println("<start>.task2") | |
delay(1.seconds) | |
error("fail: task2") | |
return "success: task2" | |
} | |
suspend fun task3(): String { | |
println("<start>.task3") | |
//error("fail: task3") | |
delay(3.seconds) | |
println("<finish>.task3") | |
return "success: task3" | |
} | |
suspend fun task4(): String { | |
println("<start>.task4") | |
// error("fail: task4") | |
delay(2.seconds) | |
println("<finish>.task4") | |
return "success: task4" | |
} | |
suspend fun task5(): String { | |
println("<start>.task5") | |
delay(2.seconds) | |
println("<finish>.task5") | |
return "success: task5" | |
} | |
@OptIn(FlowPreview::class) | |
suspend fun main() = coroutineScope { | |
val tasks = listOf( | |
::task1.asFlow(), | |
::task2.asFlow(), | |
::task3.asFlow(), | |
::task4.asFlow(), | |
::task5.asFlow(), | |
) | |
val first = happyEyeBalls(tasks, 2.seconds).first() | |
println(first) // task3 is quickest - task5 doesn't start | |
} | |
@OptIn(FlowPreview::class) | |
suspend fun happyEyeBalls(tasks: List<Flow<String>>, delayedBy: Duration): Flow<String> { | |
val failedTask = Channel<Unit>(1) | |
return when (tasks.size) { | |
0 -> error("no tasks") | |
1 -> tasks.first().catch { failedTask.trySend(Unit) } | |
else -> { | |
merge( | |
tasks.first().catch { failedTask.trySend(Unit) }, | |
flow { emit(delayOrFail(failedTask, delayedBy)) }.flatMapMerge { | |
happyEyeBalls(tasks.drop(1), delayedBy) // delayOrFail flatmap with recursive tasks | |
} | |
) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment