Skip to content

Instantly share code, notes, and snippets.

@fluidsonic
Created October 11, 2020 15:36
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fluidsonic/3ddb4727e642c0374de488a13a5f8f36 to your computer and use it in GitHub Desktop.
Save fluidsonic/3ddb4727e642c0374de488a13a5f8f36 to your computer and use it in GitHub Desktop.
Rudimentary Flow.shareIn with WhenSubscribed behavior until the official operator is launched
fun <T> Flow<T>.shareIn(scope: CoroutineScope): Flow<T> {
val upstream = this
var broadcastChannel: BroadcastChannel<T>? = null
val mutex = Mutex()
var subscriberCount = 0
return flow {
val activeChannel = mutex.withLock {
subscriberCount += 1
broadcastChannel ?: upstream.broadcastIn(scope).also {
broadcastChannel = it
}
}
try {
activeChannel.consumeEach { emit(it) }
} finally {
mutex.withLock {
subscriberCount -= 1
if (subscriberCount == 0) {
broadcastChannel?.cancel()
broadcastChannel = null
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment