Instantly share code, notes, and snippets.
Maartyl/pausingShareIn.kt Secret
Created
October 15, 2020 15:47
-
Star
(0)
0
You must be signed in to star a gist -
Fork
(0)
0
You must be signed in to fork a gist
-
Save Maartyl/ee0d88b4f9ecc5f816deeaccc80bf4e2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@ExperimentalCoroutinesApi | |
fun <T> Flow<T>.pausingShareIn( | |
scope: CoroutineScope, | |
started: SharingStarted = SharingStarted.Lazily, | |
replay: Int = 0, | |
): SharedFlow<T> = PausingSharedFlowContext<T>(scope).makeFlow(this, started, replay) | |
private typealias LBlock = suspend CoroutineScope.() -> Unit | |
@ExperimentalCoroutinesApi | |
private class PausingSharedFlowContext<T>(argScope: CoroutineScope) : CoroutineScope { | |
val paused = MutableStateFlow(true) //paused by no collectors | |
val unpausedCollectorCount = AtomicInteger(0) | |
override val coroutineContext: CoroutineContext = argScope.coroutineContext.plusMergePausing(paused) | |
fun unpausedCollectorCountChanged() { | |
//always use latest value, (multiple changes can happen concurrenctly) | |
paused.value = unpausedCollectorCount.get() == 0 | |
} | |
fun makeFlow( | |
upstream: Flow<T>, | |
started: SharingStarted = SharingStarted.Lazily, | |
replay: Int = 0, | |
): SharedFlow<T> = wrapSharedFlow(upstream.shareIn(this, started, replay)) | |
fun wrapSharedFlow(sf: SharedFlow<T>): SharedFlow<T> { | |
//subscriptionCount would not be enough anyway (must use count of unpaused cont; not just count) | |
//TOUP: wrap directly (Unsafe Flow) | |
// - can avoid wrapper object | |
val ff = flow { | |
val lb = currentCoroutineContext()[CoroutinePausing] | |
?.takeUnless { it === pausingAlwaysFalse } | |
?.let { { adjustingCount(it) } } | |
?: singleAdjusterForAlwaysFalse | |
//scope pausing is ignored here .... although... depends on impl of cp StateFlow... - so explicitly. | |
val pj = this@PausingSharedFlowContext.launch(pausingAlwaysFalse, block = lb) | |
try { | |
emitAll(sf) | |
} finally { | |
pj.cancel() | |
} | |
} | |
return object : SharedFlow<T>, Flow<T> by ff { | |
override val replayCache: List<T> get() = sf.replayCache | |
} | |
} | |
//does this work? can I reuse a single instance? | |
val singleAdjusterForAlwaysFalse: LBlock = { | |
try { | |
unpausedCollectorCount.incrementAndGet() | |
unpausedCollectorCountChanged() | |
awaitCancellation() | |
} finally { | |
unpausedCollectorCount.decrementAndGet() | |
unpausedCollectorCountChanged() | |
} | |
} | |
private suspend fun adjustingCount(cp: CoroutinePausing) { | |
val s = cp.isPaused | |
var wasPaused = true //my contribution to unpausedCollectorCount; false=1; true=0 | |
try { | |
s.collect { collectorPaused -> | |
if (collectorPaused) { | |
if (!wasPaused) | |
unpausedCollectorCount.decrementAndGet() | |
} else { | |
if (wasPaused) | |
unpausedCollectorCount.incrementAndGet() | |
} | |
wasPaused = collectorPaused | |
unpausedCollectorCountChanged() | |
} | |
} finally { | |
if (!wasPaused) | |
unpausedCollectorCount.decrementAndGet() | |
unpausedCollectorCountChanged() | |
} | |
} | |
} | |
@ExperimentalCoroutinesApi | |
internal fun mergePausing( //UNTESTED impl @200910 | |
coroutineContext: CoroutineContext, | |
isPaused: StateFlow<Boolean> | |
): CoroutinePausing { | |
val pCtx = coroutineContext[CoroutinePausing] | |
return if (pCtx == null) isPaused.asCoroutinePausing() else { | |
val p1 = pCtx.isPaused | |
val p2 = isPaused | |
val combined = flow { | |
coroutineScope { | |
val c = MutableStateFlow(p2.value || p1.value) | |
//I think even this is a lot more efficient than combine (which makes (fair) channels underneath) | |
// - COULD even use single MutableStateFlow, instead of inside each collect | |
// - but that would require coroutines running until coroutineContext[Job] cancels them... | |
//maybe: launch(pausingAlwaysFalse) | |
launch { p1.collect { c.value = it || p2.value } } | |
launch { p2.collect { c.value = it || p1.value } } | |
emitAll(c) | |
} | |
} | |
object : CoroutinePausing, StateFlow<Boolean>, Flow<Boolean> by combined { | |
override val isPaused = this | |
override val value: Boolean get() = p2.value || p1.value | |
override val replayCache: List<Boolean> get() = listOf(value) | |
} | |
} | |
} | |
@ExperimentalCoroutinesApi | |
internal fun CoroutineContext.plusMergePausing(isPaused: StateFlow<Boolean>): CoroutineContext = | |
this + mergePausing(this, isPaused) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment