Created
October 15, 2020 07:58
-
-
Save Maartyl/c44ea10fe04d6fcf924abe36099888b8 to your computer and use it in GitHub Desktop.
Pausing flow, based on 01702dbab744595a8dbdd41befe6829c
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package cz.maartyl.ex | |
/* Output | |
[ORIGINAL OUTPUT] | |
Collecting 2 elements… | |
upstream -> hot | |
emit: 0 | |
resumed | |
collect: 0 | |
Expensive work on 0 (isPaused = false)… | |
Expensive work on 0 (isPaused = false)… | |
Expensive work on 0 (isPaused = false)… | |
Expensive work on 0 (isPaused = false)… | |
Expensive work on 0 (isPaused = false)… | |
emit: 1 | |
collect: 1 | |
Expensive work on 1 (isPaused = false)… | |
Done. | |
Waiting 10s to see that the Flow really pauses. | |
paused | |
Expensive work on 1 (isPaused = true)… | |
<nothing happening for a while> //difference, but I think not really | |
Collecting 2 more elements… | |
resumed | |
collect: 1 | |
Expensive work on 1 (isPaused = false)… | |
Expensive work on 1 (isPaused = false)… | |
Expensive work on 1 (isPaused = false)… | |
emit: 2 | |
collect: 2 | |
Expensive work on 2 (isPaused = false)… | |
Looks good? | |
paused | |
upstream -> cold | |
[CoroutinePausing OUTPUT] | |
Collecting 2 elements… | |
upstream -> hot | |
emit: 0 | |
paused //difference: explained in comment | |
resumed | |
collect: 0 | |
Expensive work on 0 (isPaused = false)… | |
Expensive work on 0 (isPaused = false)… | |
Expensive work on 0 (isPaused = false)… | |
Expensive work on 0 (isPaused = false)… | |
Expensive work on 0 (isPaused = false)… | |
emit: 1 | |
collect: 1 | |
Expensive work on 1 (isPaused = false)… | |
Done. | |
Waiting 10s to see that the Flow really pauses. | |
paused | |
Expensive work on 1 (isPaused = true)… | |
Collecting 2 more elements… | |
resumed | |
collect: 1 | |
Expensive work on 1 (isPaused = false)… | |
Expensive work on 1 (isPaused = false)… | |
Expensive work on 1 (isPaused = false)… | |
emit: 2 | |
collect: 2 | |
Expensive work on 2 (isPaused = false)… | |
Looks good? | |
paused | |
upstream -> cold | |
*/ | |
//import kotlinx.atomicfu.* | |
import kotlinx.coroutines.* | |
import kotlinx.coroutines.flow.* | |
import maartyl.emko.util.pausing.* | |
//for now, this was private in my impl; just placing it here for the example | |
@ExperimentalCoroutinesApi | |
private val pausingAlwaysFalse = MutableStateFlow(false).asCoroutinePausing() | |
@ExperimentalCoroutinesApi | |
suspend fun main(): Unit = coroutineScope { | |
//a normal flow: not a special `pausingFlow` | |
// - could be after any number of operators, or middle passed through a library or anything | |
val flow0 = flow { | |
println("upstream -> hot") | |
try { | |
//normally, you don't even need to access this, but to mimic all your calls | |
// - my lib is immature: probably might be a useful function to have | |
val cp = currentCoroutineContext()[CoroutinePausing] ?: pausingAlwaysFalse | |
//(using the outer scope is good enough for the example) | |
launch { | |
cp.isPaused.collect { | |
if (it) { | |
println("paused") | |
} else { | |
println("resumed") | |
} | |
} | |
} | |
var value = 0 | |
println("emit: $value") | |
emit(value) | |
while (true) { | |
repeat(5) { // expensive work | |
awaitUnpaused() //just put this before any expensive work | |
delay(1_000) | |
val isPaused = cp.isPaused.value | |
println("Expensive work on $value (isPaused = $isPaused)…") | |
} | |
value += 1 | |
println("emit: $value") | |
emit(value) | |
} | |
} finally { | |
println("upstream -> cold") | |
} | |
} | |
//!!! pausingStateIn would be just `shareIn` but it does not implement it yet | |
val flow = flow0.pausingStateIn(this) | |
println("Collecting 2 elements…") | |
try { | |
flow.collect { | |
println("collect: $it") | |
if (it == 1) { | |
delay(1_500) | |
throw NumberFormatException() | |
} | |
} | |
} catch (e: NumberFormatException) { | |
} | |
println("Done.") | |
println("Waiting 10s to see that the Flow really pauses.") | |
delay(10_000) | |
println("Collecting 2 more elements…") | |
try { | |
flow.collect { | |
println("collect: $it") | |
if (it == 2) { | |
delay(1_500) | |
throw NumberFormatException() | |
} | |
} | |
} catch (e: NumberFormatException) { | |
} | |
println("Looks good?") | |
coroutineContext.cancelChildren() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment