Last active
October 14, 2020 17:12
-
-
Save fluidsonic/a06419b1c129f3045558c63cbafecca4 to your computer and use it in GitHub Desktop.
Creates a cache that uses "child" MutableStateFlows that won't get a value emitted for its own changes in order to avoid cycles.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import kotlinx.coroutines.* | |
import kotlinx.coroutines.flow.* | |
/* Output: | |
cache collected: initial | |
flow1 (collector 1) collected: initial | |
flow1 (collector 2) collected: initial | |
flow2 (collector 1) collected: initial | |
Updating cache directly… | |
cache collected: update through cache | |
flow1 (collector 1) collected: update through cache | |
flow1 (collector 2) collected: update through cache | |
flow2 (collector 1) collected: update through cache | |
Updating cache through flow1… | |
cache collected: update through flow1 | |
flow2 (collector 1) collected: update through flow1 | |
Updating cache through flow2… | |
cache collected: update through flow2 | |
flow1 (collector 1) collected: update through flow2 | |
flow1 (collector 2) collected: update through flow2 | |
Fin. | |
*/ | |
suspend fun main(): Unit = coroutineScope { | |
val cache = MutableCacheFlow("initial") | |
val flow1 = cache.nonRecursive() | |
val flow2 = cache.nonRecursive() | |
launch { | |
cache.collect { println("cache collected: $it") } | |
} | |
launch { | |
flow1.collect { delay(5); println("flow1 (collector 1) collected: $it") } | |
} | |
launch { | |
flow1.collect { delay(10); println("flow1 (collector 2) collected: $it") } | |
} | |
launch { | |
flow2.collect { delay(15); println("flow2 (collector 1) collected: $it") } | |
} | |
delay(500) | |
println() | |
println("Updating cache directly…") | |
println() | |
cache.value = "update through cache" | |
delay(500) | |
println() | |
println("Updating cache through flow1…") | |
println() | |
flow1.value = "update through flow1" | |
delay(500) | |
println() | |
println("Updating cache through flow2…") | |
println() | |
flow2.value = "update through flow2" | |
delay(500) | |
println() | |
println("Fin.") | |
coroutineContext.cancelChildren() | |
} | |
public interface CacheFlow<Value> : StateFlow<Value> | |
public interface MutableCacheFlow<Value> : CacheFlow<Value>, MutableStateFlow<Value> { | |
public val isRecursive: Boolean | |
public fun nonRecursive(): MutableCacheFlow<Value> | |
public fun recursive(): MutableCacheFlow<Value> | |
} | |
@Suppress("FunctionName") | |
public fun <Value> MutableCacheFlow(initialValue: Value): MutableCacheFlow<Value> = | |
DefaultCacheFlow( | |
underlyingFlow = MutableStateFlow(CacheState(origin = null, value = initialValue)), | |
isRecursive = true, | |
recursiveParentFlow = null, | |
) | |
public suspend fun <Value> Flow<Value>.cacheIn(scope: CoroutineScope): MutableCacheFlow<Value> { | |
val deferredCache = CompletableDeferred<MutableCacheFlow<Value>>() | |
val upstream = this | |
scope.launch { | |
var cache: MutableCacheFlow<Value>? = null | |
upstream.collect { value -> | |
cache?.also { it.value = value } ?: run { | |
cache = MutableCacheFlow(value).also { | |
deferredCache.complete(it) | |
} | |
} | |
} | |
} | |
return deferredCache.await() | |
} | |
public fun <Value> Flow<Value>.cacheIn( | |
scope: CoroutineScope, | |
started: SharingStarted, | |
initialValue: Value, | |
): MutableCacheFlow<Value> { | |
val upstream = this | |
val cache = MutableCacheFlow(initialValue = initialValue) | |
scope.launch { | |
cache.emitAll(upstream.shareIn(scope, started)) | |
} | |
return cache | |
} | |
private class CacheState<Value>( | |
val origin: MutableCacheFlow<Value>?, | |
val value: Value, | |
) { | |
override fun equals(other: Any?) = | |
this === other || (other is CacheState<*> && value == other.value) | |
override fun hashCode() = | |
value.hashCode() | |
} | |
private class DefaultCacheFlow<Value>( | |
private val underlyingFlow: MutableStateFlow<CacheState<Value>>, | |
override val isRecursive: Boolean, | |
private val recursiveParentFlow: MutableCacheFlow<Value>?, | |
) : AbstractFlow<Value>(), MutableCacheFlow<Value> { | |
override suspend fun collectSafely(collector: FlowCollector<Value>) { | |
collector.emitAll(underlyingFlow.filter { isRecursive || it.origin !== this }.map { it.value }) | |
} | |
override fun compareAndSet(expect: Value, update: Value) = | |
underlyingFlow.compareAndSet(expect = newState(expect), update = newState(update)) | |
override suspend fun emit(value: Value) { | |
underlyingFlow.emit(newState(value)) | |
} | |
private fun newState(value: Value) = | |
CacheState(origin = this, value = value) | |
override fun nonRecursive(): MutableCacheFlow<Value> = | |
when (isRecursive) { | |
true -> DefaultCacheFlow(underlyingFlow = underlyingFlow, isRecursive = false, recursiveParentFlow = this) | |
false -> this | |
} | |
override fun resetReplayCache() { | |
underlyingFlow.resetReplayCache() | |
} | |
override val subscriptionCount | |
get() = underlyingFlow.subscriptionCount | |
override fun recursive(): MutableCacheFlow<Value> = | |
when (isRecursive) { | |
true -> this | |
false -> checkNotNull(recursiveParentFlow) | |
} | |
override val replayCache: List<Value> | |
get() = underlyingFlow.replayCache.map { it.value } | |
override fun tryEmit(value: Value) = | |
underlyingFlow.tryEmit(newState(value)) | |
override var value: Value | |
get() = underlyingFlow.value.value | |
set(value) { | |
underlyingFlow.value = newState(value) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment