Skip to content

Instantly share code, notes, and snippets.

@fluidsonic
Last active October 14, 2020 21:29
Show Gist options
  • Save fluidsonic/01702dbab744595a8dbdd41befe6829c to your computer and use it in GitHub Desktop.
Save fluidsonic/01702dbab744595a8dbdd41befe6829c to your computer and use it in GitHub Desktop.
/* Output
Collecting 2 elements…
upstream -> hot
emit: 0
resumed
collect: 0
Expensive work on 0 (isPaused = false)…
Expensive work on 0 (isPaused = false)…
Expensive work on 0 (isPaused = false)…
Expensive work on 0 (isPaused = false)…
Expensive work on 0 (isPaused = false)…
emit: 1
collect: 1
Expensive work on 1 (isPaused = false)…
Done.
Waiting 10s to see that the Flow really pauses.
paused
Expensive work on 1 (isPaused = true)…
<nothing happening for a while>
Collecting 2 more elements…
resumed
collect: 1
Expensive work on 1 (isPaused = false)…
Expensive work on 1 (isPaused = false)…
Expensive work on 1 (isPaused = false)…
emit: 2
collect: 2
Expensive work on 2 (isPaused = false)…
Looks good?
paused
upstream -> cold
*/
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
suspend fun main(): Unit = coroutineScope {
val flow = pausableFlow<Int> {
println("upstream -> hot")
try {
onPause { println("paused") }
onResume { println("resumed") }
var value = 0
println("emit: $value")
emit(value)
while (true) {
repeat(5) { // expensive work
joinPause {
delay(1_000)
println("Expensive work on $value (isPaused = $isPaused)…")
}
}
value += 1
println("emit: $value")
emit(value)
}
} finally {
println("upstream -> cold")
}
}
println("Collecting 2 elements…")
try {
flow.collect {
println("collect: $it")
if (it == 1) {
delay(1_500)
throw NumberFormatException()
}
}
} catch (e: NumberFormatException) {
}
println("Done.")
println("Waiting 10s to see that the Flow really pauses.")
delay(10_000)
println("Collecting 2 more elements…")
try {
flow.collect {
println("collect: $it")
if (it == 2) {
delay(1_500)
throw NumberFormatException()
}
}
} catch (e: NumberFormatException) {
}
println("Looks good?")
coroutineContext.cancelChildren()
}
interface PausableFlowCollector<T> : FlowCollector<T> {
val isPaused: Boolean
suspend fun joinPause()
fun onPause(block: suspend () -> Unit)
fun onResume(block: suspend () -> Unit)
}
suspend inline fun <T, R> PausableFlowCollector<T>.joinPause(block: () -> R): R =
try {
joinPause()
block()
} finally {
joinPause()
}
private class PausableFlowCollectorImpl<T> : PausableFlowCollector<T> {
var _isPaused = atomic(true)
val flow = MutableSharedFlow<T>(replay = 1)
var pause = CompletableDeferred<Unit>()
val onPause = mutableListOf<suspend () -> Unit>()
val onResume = mutableListOf<suspend () -> Unit>()
override suspend fun emit(value: T) {
joinPause {
flow.emit(value)
}
}
override val isPaused
get() = _isPaused.value
override suspend fun joinPause() {
pause.await()
}
override fun onPause(block: suspend () -> Unit) {
onPause += block
}
override fun onResume(block: suspend () -> Unit) {
onResume += block
}
}
fun <T> CoroutineScope.pausableFlow(block: suspend PausableFlowCollector<T>.() -> Unit): SharedFlow<T> =
PausableFlowCollectorImpl<T>()
.apply {
launch {
flow.subscriptionCount.collect { count ->
when {
count > 0 -> {
if (_isPaused.getAndSet(false)) {
onResume.forEach { it() }
pause.complete(Unit)
}
}
count == 0 -> {
if (!_isPaused.getAndSet(true)) {
onPause.forEach { it() }
pause = CompletableDeferred()
}
}
}
}
}
launch {
block()
}
}
.flow
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment