Skip to content

Instantly share code, notes, and snippets.

@fluidsonic
Last active October 14, 2020 17:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fluidsonic/a06419b1c129f3045558c63cbafecca4 to your computer and use it in GitHub Desktop.
Save fluidsonic/a06419b1c129f3045558c63cbafecca4 to your computer and use it in GitHub Desktop.
Creates a cache that uses "child" MutableStateFlows that won't get a value emitted for its own changes in order to avoid cycles.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
/* Output:
cache collected: initial
flow1 (collector 1) collected: initial
flow1 (collector 2) collected: initial
flow2 (collector 1) collected: initial
Updating cache directly…
cache collected: update through cache
flow1 (collector 1) collected: update through cache
flow1 (collector 2) collected: update through cache
flow2 (collector 1) collected: update through cache
Updating cache through flow1…
cache collected: update through flow1
flow2 (collector 1) collected: update through flow1
Updating cache through flow2…
cache collected: update through flow2
flow1 (collector 1) collected: update through flow2
flow1 (collector 2) collected: update through flow2
Fin.
*/
suspend fun main(): Unit = coroutineScope {
val cache = MutableCacheFlow("initial")
val flow1 = cache.nonRecursive()
val flow2 = cache.nonRecursive()
launch {
cache.collect { println("cache collected: $it") }
}
launch {
flow1.collect { delay(5); println("flow1 (collector 1) collected: $it") }
}
launch {
flow1.collect { delay(10); println("flow1 (collector 2) collected: $it") }
}
launch {
flow2.collect { delay(15); println("flow2 (collector 1) collected: $it") }
}
delay(500)
println()
println("Updating cache directly…")
println()
cache.value = "update through cache"
delay(500)
println()
println("Updating cache through flow1…")
println()
flow1.value = "update through flow1"
delay(500)
println()
println("Updating cache through flow2…")
println()
flow2.value = "update through flow2"
delay(500)
println()
println("Fin.")
coroutineContext.cancelChildren()
}
public interface CacheFlow<Value> : StateFlow<Value>
public interface MutableCacheFlow<Value> : CacheFlow<Value>, MutableStateFlow<Value> {
public val isRecursive: Boolean
public fun nonRecursive(): MutableCacheFlow<Value>
public fun recursive(): MutableCacheFlow<Value>
}
@Suppress("FunctionName")
public fun <Value> MutableCacheFlow(initialValue: Value): MutableCacheFlow<Value> =
DefaultCacheFlow(
underlyingFlow = MutableStateFlow(CacheState(origin = null, value = initialValue)),
isRecursive = true,
recursiveParentFlow = null,
)
public suspend fun <Value> Flow<Value>.cacheIn(scope: CoroutineScope): MutableCacheFlow<Value> {
val deferredCache = CompletableDeferred<MutableCacheFlow<Value>>()
val upstream = this
scope.launch {
var cache: MutableCacheFlow<Value>? = null
upstream.collect { value ->
cache?.also { it.value = value } ?: run {
cache = MutableCacheFlow(value).also {
deferredCache.complete(it)
}
}
}
}
return deferredCache.await()
}
public fun <Value> Flow<Value>.cacheIn(
scope: CoroutineScope,
started: SharingStarted,
initialValue: Value,
): MutableCacheFlow<Value> {
val upstream = this
val cache = MutableCacheFlow(initialValue = initialValue)
scope.launch {
cache.emitAll(upstream.shareIn(scope, started))
}
return cache
}
private class CacheState<Value>(
val origin: MutableCacheFlow<Value>?,
val value: Value,
) {
override fun equals(other: Any?) =
this === other || (other is CacheState<*> && value == other.value)
override fun hashCode() =
value.hashCode()
}
private class DefaultCacheFlow<Value>(
private val underlyingFlow: MutableStateFlow<CacheState<Value>>,
override val isRecursive: Boolean,
private val recursiveParentFlow: MutableCacheFlow<Value>?,
) : AbstractFlow<Value>(), MutableCacheFlow<Value> {
override suspend fun collectSafely(collector: FlowCollector<Value>) {
collector.emitAll(underlyingFlow.filter { isRecursive || it.origin !== this }.map { it.value })
}
override fun compareAndSet(expect: Value, update: Value) =
underlyingFlow.compareAndSet(expect = newState(expect), update = newState(update))
override suspend fun emit(value: Value) {
underlyingFlow.emit(newState(value))
}
private fun newState(value: Value) =
CacheState(origin = this, value = value)
override fun nonRecursive(): MutableCacheFlow<Value> =
when (isRecursive) {
true -> DefaultCacheFlow(underlyingFlow = underlyingFlow, isRecursive = false, recursiveParentFlow = this)
false -> this
}
override fun resetReplayCache() {
underlyingFlow.resetReplayCache()
}
override val subscriptionCount
get() = underlyingFlow.subscriptionCount
override fun recursive(): MutableCacheFlow<Value> =
when (isRecursive) {
true -> this
false -> checkNotNull(recursiveParentFlow)
}
override val replayCache: List<Value>
get() = underlyingFlow.replayCache.map { it.value }
override fun tryEmit(value: Value) =
underlyingFlow.tryEmit(newState(value))
override var value: Value
get() = underlyingFlow.value.value
set(value) {
underlyingFlow.value = newState(value)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment