Skip to content

Instantly share code, notes, and snippets.

@Maartyl
Created October 15, 2020 15:47
Show Gist options
  • Save Maartyl/ee0d88b4f9ecc5f816deeaccc80bf4e2 to your computer and use it in GitHub Desktop.
Save Maartyl/ee0d88b4f9ecc5f816deeaccc80bf4e2 to your computer and use it in GitHub Desktop.
@ExperimentalCoroutinesApi
fun <T> Flow<T>.pausingShareIn(
scope: CoroutineScope,
started: SharingStarted = SharingStarted.Lazily,
replay: Int = 0,
): SharedFlow<T> = PausingSharedFlowContext<T>(scope).makeFlow(this, started, replay)
private typealias LBlock = suspend CoroutineScope.() -> Unit
@ExperimentalCoroutinesApi
private class PausingSharedFlowContext<T>(argScope: CoroutineScope) : CoroutineScope {
val paused = MutableStateFlow(true) //paused by no collectors
val unpausedCollectorCount = AtomicInteger(0)
override val coroutineContext: CoroutineContext = argScope.coroutineContext.plusMergePausing(paused)
fun unpausedCollectorCountChanged() {
//always use latest value, (multiple changes can happen concurrenctly)
paused.value = unpausedCollectorCount.get() == 0
}
fun makeFlow(
upstream: Flow<T>,
started: SharingStarted = SharingStarted.Lazily,
replay: Int = 0,
): SharedFlow<T> = wrapSharedFlow(upstream.shareIn(this, started, replay))
fun wrapSharedFlow(sf: SharedFlow<T>): SharedFlow<T> {
//subscriptionCount would not be enough anyway (must use count of unpaused cont; not just count)
//TOUP: wrap directly (Unsafe Flow)
// - can avoid wrapper object
val ff = flow {
val lb = currentCoroutineContext()[CoroutinePausing]
?.takeUnless { it === pausingAlwaysFalse }
?.let { { adjustingCount(it) } }
?: singleAdjusterForAlwaysFalse
//scope pausing is ignored here .... although... depends on impl of cp StateFlow... - so explicitly.
val pj = this@PausingSharedFlowContext.launch(pausingAlwaysFalse, block = lb)
try {
emitAll(sf)
} finally {
pj.cancel()
}
}
return object : SharedFlow<T>, Flow<T> by ff {
override val replayCache: List<T> get() = sf.replayCache
}
}
//does this work? can I reuse a single instance?
val singleAdjusterForAlwaysFalse: LBlock = {
try {
unpausedCollectorCount.incrementAndGet()
unpausedCollectorCountChanged()
awaitCancellation()
} finally {
unpausedCollectorCount.decrementAndGet()
unpausedCollectorCountChanged()
}
}
private suspend fun adjustingCount(cp: CoroutinePausing) {
val s = cp.isPaused
var wasPaused = true //my contribution to unpausedCollectorCount; false=1; true=0
try {
s.collect { collectorPaused ->
if (collectorPaused) {
if (!wasPaused)
unpausedCollectorCount.decrementAndGet()
} else {
if (wasPaused)
unpausedCollectorCount.incrementAndGet()
}
wasPaused = collectorPaused
unpausedCollectorCountChanged()
}
} finally {
if (!wasPaused)
unpausedCollectorCount.decrementAndGet()
unpausedCollectorCountChanged()
}
}
}
@ExperimentalCoroutinesApi
internal fun mergePausing( //UNTESTED impl @200910
coroutineContext: CoroutineContext,
isPaused: StateFlow<Boolean>
): CoroutinePausing {
val pCtx = coroutineContext[CoroutinePausing]
return if (pCtx == null) isPaused.asCoroutinePausing() else {
val p1 = pCtx.isPaused
val p2 = isPaused
val combined = flow {
coroutineScope {
val c = MutableStateFlow(p2.value || p1.value)
//I think even this is a lot more efficient than combine (which makes (fair) channels underneath)
// - COULD even use single MutableStateFlow, instead of inside each collect
// - but that would require coroutines running until coroutineContext[Job] cancels them...
//maybe: launch(pausingAlwaysFalse)
launch { p1.collect { c.value = it || p2.value } }
launch { p2.collect { c.value = it || p1.value } }
emitAll(c)
}
}
object : CoroutinePausing, StateFlow<Boolean>, Flow<Boolean> by combined {
override val isPaused = this
override val value: Boolean get() = p2.value || p1.value
override val replayCache: List<Boolean> get() = listOf(value)
}
}
}
@ExperimentalCoroutinesApi
internal fun CoroutineContext.plusMergePausing(isPaused: StateFlow<Boolean>): CoroutineContext =
this + mergePausing(this, isPaused)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment