Skip to content

Instantly share code, notes, and snippets.

@RBusarow
Last active February 17, 2020 05:03
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save RBusarow/0854e5f06e2875cf67748d216f0f1cb7 to your computer and use it in GitHub Desktop.
Save RBusarow/0854e5f06e2875cf67748d216f0f1cb7 to your computer and use it in GitHub Desktop.
Resettable lazy version of a shared Flow, which allows for multiple observers to consume a single emitter, with an automatic reset after the last consumer stops and an automatic resume when another consumer starts.
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
fun <T> Flow<T>.shareIn(
scope: CoroutineScope
): Flow<T> = SharedFlow(
BroadcastManager(
this,
scope
)
)
internal class SharedFlow<T>(
private val broadcastManager: BroadcastManager<T>
) : Flow<T> {
internal var count = 0
override suspend fun collect(
collector: FlowCollector<T>
) = collector.emitAll(createFlow())
internal suspend fun createFlow(): Flow<T> = broadcastManager.getChannel()
.asFlow()
.onStart { broadcastManager.onNewConsumer() }
.onCompletion { broadcastManager.onConsumeEnd() }
}
internal class BroadcastManager<T>(
private val sourceFlow: Flow<T>,
private val scope: CoroutineScope
) {
private var count = 0
private val mutex = Mutex(false)
private var channelRef: BroadcastChannel<T> = createChannel()
private fun createChannel(): BroadcastChannel<T> = sourceFlow
.broadcastIn(scope)
private fun reset() {
channelRef = createChannel()
}
suspend fun onNewConsumer() = mutex.withLock { count++ }
suspend fun onConsumeEnd() = mutex.withLock { if (--count == 0) reset() }
suspend fun getChannel(): BroadcastChannel<T> = mutex.withLock { channelRef }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment