Skip to content

Instantly share code, notes, and snippets.

@Maartyl
Created October 15, 2020 07:58
Show Gist options
  • Save Maartyl/c44ea10fe04d6fcf924abe36099888b8 to your computer and use it in GitHub Desktop.
Save Maartyl/c44ea10fe04d6fcf924abe36099888b8 to your computer and use it in GitHub Desktop.
Pausing flow, based on 01702dbab744595a8dbdd41befe6829c
package cz.maartyl.ex
/* Output
[ORIGINAL OUTPUT]
Collecting 2 elements…
upstream -> hot
emit: 0
resumed
collect: 0
Expensive work on 0 (isPaused = false)…
Expensive work on 0 (isPaused = false)…
Expensive work on 0 (isPaused = false)…
Expensive work on 0 (isPaused = false)…
Expensive work on 0 (isPaused = false)…
emit: 1
collect: 1
Expensive work on 1 (isPaused = false)…
Done.
Waiting 10s to see that the Flow really pauses.
paused
Expensive work on 1 (isPaused = true)…
<nothing happening for a while> //difference, but I think not really
Collecting 2 more elements…
resumed
collect: 1
Expensive work on 1 (isPaused = false)…
Expensive work on 1 (isPaused = false)…
Expensive work on 1 (isPaused = false)…
emit: 2
collect: 2
Expensive work on 2 (isPaused = false)…
Looks good?
paused
upstream -> cold
[CoroutinePausing OUTPUT]
Collecting 2 elements…
upstream -> hot
emit: 0
paused //difference: explained in comment
resumed
collect: 0
Expensive work on 0 (isPaused = false)…
Expensive work on 0 (isPaused = false)…
Expensive work on 0 (isPaused = false)…
Expensive work on 0 (isPaused = false)…
Expensive work on 0 (isPaused = false)…
emit: 1
collect: 1
Expensive work on 1 (isPaused = false)…
Done.
Waiting 10s to see that the Flow really pauses.
paused
Expensive work on 1 (isPaused = true)…
Collecting 2 more elements…
resumed
collect: 1
Expensive work on 1 (isPaused = false)…
Expensive work on 1 (isPaused = false)…
Expensive work on 1 (isPaused = false)…
emit: 2
collect: 2
Expensive work on 2 (isPaused = false)…
Looks good?
paused
upstream -> cold
*/
//import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import maartyl.emko.util.pausing.*
//for now, this was private in my impl; just placing it here for the example
@ExperimentalCoroutinesApi
private val pausingAlwaysFalse = MutableStateFlow(false).asCoroutinePausing()
@ExperimentalCoroutinesApi
suspend fun main(): Unit = coroutineScope {
//a normal flow: not a special `pausingFlow`
// - could be after any number of operators, or middle passed through a library or anything
val flow0 = flow {
println("upstream -> hot")
try {
//normally, you don't even need to access this, but to mimic all your calls
// - my lib is immature: probably might be a useful function to have
val cp = currentCoroutineContext()[CoroutinePausing] ?: pausingAlwaysFalse
//(using the outer scope is good enough for the example)
launch {
cp.isPaused.collect {
if (it) {
println("paused")
} else {
println("resumed")
}
}
}
var value = 0
println("emit: $value")
emit(value)
while (true) {
repeat(5) { // expensive work
awaitUnpaused() //just put this before any expensive work
delay(1_000)
val isPaused = cp.isPaused.value
println("Expensive work on $value (isPaused = $isPaused)…")
}
value += 1
println("emit: $value")
emit(value)
}
} finally {
println("upstream -> cold")
}
}
//!!! pausingStateIn would be just `shareIn` but it does not implement it yet
val flow = flow0.pausingStateIn(this)
println("Collecting 2 elements…")
try {
flow.collect {
println("collect: $it")
if (it == 1) {
delay(1_500)
throw NumberFormatException()
}
}
} catch (e: NumberFormatException) {
}
println("Done.")
println("Waiting 10s to see that the Flow really pauses.")
delay(10_000)
println("Collecting 2 more elements…")
try {
flow.collect {
println("collect: $it")
if (it == 2) {
delay(1_500)
throw NumberFormatException()
}
}
} catch (e: NumberFormatException) {
}
println("Looks good?")
coroutineContext.cancelChildren()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment