Last active
November 8, 2023 12:31
-
-
Save tomczyn/d8f23c5e313d40c45fef87935c9c14cc to your computer and use it in GitHub Desktop.
StateInMerge - Extension for MutableStateFlow to safely merge flows into single StateFlow for view state
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example | |
import kotlinx.coroutines.* | |
import kotlinx.coroutines.flow.* | |
/** | |
* Merges the given flows into a new [MutableStateFlow] with the provided initial state, | |
* scope, and launch strategy. | |
* | |
* @param T The type of the state held by the [MutableStateFlow]. | |
* @param scope The [CoroutineScope] in which the merging will occur. | |
* @param launched The launch strategy to use for merging flows. | |
* @param flows A variable number of flows to merge. | |
* @return A new [MutableStateFlow] containing the merged state. | |
*/ | |
fun <T> MutableStateFlow<T>.stateInMerge( | |
scope: CoroutineScope, | |
launched: Launched, | |
vararg flows: StateInMergeContext<T>.() -> Flow<*>, | |
): MutableStateFlow<T> = MutableStateFlowWithStateInMerge( | |
scope = scope, | |
state = this, | |
launched = launched, | |
lambdas = flows, | |
) | |
/** | |
* Interface representing the context for merging states in [MutableStateFlow]. | |
* | |
* @param T The type of the state held by the [MutableStateFlow]. | |
*/ | |
interface StateInMergeContext<T> { | |
val state: MutableStateFlow<T> | |
fun <R> Flow<R>.onEachToState(mapper: (T, R) -> T): Flow<R> | |
} | |
/** | |
* Sealed interface representing the different launch strategies for merging flows. | |
*/ | |
sealed interface Launched { | |
data object Eagerly : Launched | |
data class WhileSubscribed(val stopTimeoutMillis: Long = 0L) : Launched | |
data object Lazily : Launched | |
} | |
@OptIn(ExperimentalCoroutinesApi::class) | |
private class MutableStateFlowWithStateInMerge<T>( | |
private val scope: CoroutineScope, | |
launched: Launched, | |
private val state: MutableStateFlow<T>, | |
lambdas: Array<out StateInMergeContext<T>.() -> Flow<*>>, | |
) : MutableStateFlow<T> by state { | |
private val context: StateInMergeContext<T> = object : StateInMergeContext<T> { | |
override val state: MutableStateFlow<T> | |
get() = this@MutableStateFlowWithStateInMerge | |
override fun <R> Flow<R>.onEachToState(mapper: (T, R) -> T): Flow<R> = | |
onEach { value -> state.update { state -> mapper(state, value) } } | |
} | |
private val flows: List<Flow<*>> = lambdas | |
.map { produceFlow -> produceFlow(context) } | |
init { | |
when (launched) { | |
Launched.Eagerly -> launchAll() | |
Launched.Lazily -> scope.launch { | |
waitForFirstSubscriber() | |
launchAll() | |
} | |
is Launched.WhileSubscribed -> { | |
var jobs: Array<Job> = emptyArray() | |
state.subscriptionCount | |
.map { it > 0 } | |
.distinctUntilChanged() | |
.flatMapLatest { subscribed -> | |
flow<Unit> { | |
when { | |
subscribed && jobs.isEmpty() -> jobs = launchAll() | |
subscribed -> launchInactive(jobs) | |
!subscribed && jobs.isNotEmpty() -> { | |
delay(launched.stopTimeoutMillis) | |
jobs.cancelActive() | |
} | |
} | |
} | |
} | |
.launchIn(scope) | |
} | |
} | |
} | |
private suspend fun waitForFirstSubscriber() { | |
state.subscriptionCount.first { it > 0 } | |
} | |
private fun launchAll(): Array<Job> = flows | |
.map { flow -> flow.launchIn(scope) } | |
.toTypedArray() | |
private fun launchInactive(jobs: Array<Job>) { | |
check(jobs.size == flows.size) | |
jobs.forEachIndexed { index, job -> | |
if (job.isCancelled) jobs[index] = flows[index].launchIn(scope) | |
} | |
} | |
private suspend fun Array<Job>.cancelActive() { | |
forEach { job -> if (job.isActive) job.cancelAndJoin() } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment