Skip to content

Instantly share code, notes, and snippets.

@streetsofboston
Last active January 26, 2021 10:51
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save streetsofboston/39c3f8c882c9891b35e0ebc3cd812381 to your computer and use it in GitHub Desktop.
Save streetsofboston/39c3f8c882c9891b35e0ebc3cd812381 to your computer and use it in GitHub Desktop.
Initial implementation of a 'shared' coroutine Flow
package connectableflow
import androidx.collection.CircularArray
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.async
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
/**
* A [Flow] of type [T] that only starts emitting value after its [connect] method is called.
*
* If this flow's [Connection] is still connected, the current [Connection] will be returned when
* [connect] is called and the flow will not be restarted.
*
* When this flow's [collect] method is called, this flow will *not* immediately start collecting. Only after
* [connect] is called, the emission and actual collecting of values starts.
*/
interface ConnectableFlow<out T> : Flow<T> {
/**
* Connects this shared [Flow] to (re-)start collecting values.
*
* @param scope The [CoroutineScope] in which the shared emissions will take place.
* @return The [Connection] that can be closed to stop this shared [Flow].
*/
fun connect(scope: CoroutineScope): Connection
}
/**
* A connection returned by a call to [ConnectableFlow.connect].
*/
interface Connection {
/**
* Returns true if this connection is connected and active.
*/
suspend fun isConnected(): Boolean
/**
* Closes this connection in an orderly fashion.
*/
suspend fun close()
}
/**
* Publishes and shares an upstream [Flow] of type [T] and returns a [ConnectableFlow] of type [T].
*
* The upstream [Flow] begins emissions only after the [ConnectableFlow.connect] has been called.
*
* @return A [ConnectableFlow] that represents the shared [Flow] of this receiver.
*/
fun <T> Flow<T>.publish(): ConnectableFlow<T> = PublishConnectableFlow(this)
fun <T> Flow<T>.replay(bufferSize: Int = 1): ConnectableFlow<T> = if (bufferSize == 1)
SingleReplayConnectableFlow(this)
else
ReplayConnectableFlow(this, bufferSize)
/**
* Creates a [Flow] of type [T] from this [ConnectableFlow] that automatically connects (i.e. calls
* [ConnectableFlow.connect]) when the first [numberOfCollectors] observer starts collecting (i.e. calls [Flow.collect])
* and automatically cancels this [ConnectableFlow] when the last observers stops collecting.
*
* @param scope The scope in which this [ConnectableFlow] will be connected.
* @param numberOfCollectors The number of observers that need to start collecting before the connection (re)starts.
* @return A shared referenced-counted [Flow].
*/
fun <T> ConnectableFlow<T>.refCount(scope: CoroutineScope, numberOfCollectors: Int = 1): Flow<T> =
RefCountFlow(this, scope, numberOfCollectors)
/**
* Creates a [Flow] of type [T] from this [ConnectableFlow] that automatically connects (i.e. calls
* [ConnectableFlow.connect] when at least [numberOfCollectors] observers start collecting.
*
* This [ConnectableFlow] will never be automatically canceled and this [ConnectableFlow] will never
* be restarted. Use [ConnectableFlow.refCount] if you need to have automatic cancellation and restart.
*
* @param scope The scope in which this [ConnectableFlow] will be connected.
* @param numberOfCollectors The number of observers that need to start collecting before the connection starts.
* @return A shared connection-counted [Flow].
*/
fun <T> ConnectableFlow<T>.autoConnect(scope: CoroutineScope, numberOfCollectors: Int = 1): Flow<T> =
AutoConnectFlow(this, scope, numberOfCollectors)
/**
* Shares this [Flow] of type [T] with multiple observers but won't restart when each observer starts
* collecting. This is the same as calling [Flow.publish] and then [ConnectableFlow.refCount].
*
* @param scope The scope in which this [ConnectableFlow] will be connected.
* @return A new [Flow] that shares this [Flow]
*/
fun <T> Flow<T>.share(scope: CoroutineScope): Flow<T> = publish().refCount(scope)
/**
* Caches the last [cacheSize] elements of this [Flow] of type [T].
*
* When a new observer starts collecting while this cached [Flow] is emitting, it will immediately receive the
* last [cacheSize] elements emitted by the [Flow]. This is the same as calling [Flow.replay] and
* then [ConnectableFlow.autoConnect].
*
* @param scope The scope in which this [ConnectableFlow] will be connected.
* @param cacheSize The size of the cache
* @return A new [Flow] that caches the last [cacheSize] emission of this [Flow]
*/
fun <T> Flow<T>.cache(scope: CoroutineScope, cacheSize: Int = 1): Flow<T> = replay(cacheSize).autoConnect(scope)
private abstract class ConnectableFlowImpl<T>(private val upStream: Flow<T>) : ConnectableFlow<T> {
private val collectors: MutableList<CollectorInfo<T>> = mutableListOf()
private val connection: ConnectionImpl = ConnectionImpl()
override suspend fun collect(collector: FlowCollector<in T>) = coroutineScope {
onCollect(this, collector)
suspendCancellableCoroutine<Unit> { cont ->
val observerInfo = CollectorInfo(collector, this, cont)
cont.invokeOnCancellation {
collectors.runSync {
this -= observerInfo
}
}
collectors.runSync {
this += observerInfo
}
}
}
override fun connect(scope: CoroutineScope): Connection {
return connection.apply {
scope.fetchJob { startConnection(this) }
}
}
protected open fun onCollect(collectionScope: CoroutineScope, collector: FlowCollector<in T>) {}
protected open fun onEmit(value: T) {}
protected open fun onCloseConnection(connectionScope: CoroutineScope) {}
private fun startConnection(connectionScope: CoroutineScope): Job {
return connectionScope.launch {
try {
upStream.collect(::notifyCollectors)
completeCollection()
} catch (e: Throwable) {
failCollection(e)
} finally {
collectors.runSync { clear() }
onCloseConnection(connectionScope)
}
}
}
private suspend fun notifyCollectors(value: T) {
onEmit(value)
val jobs = collectors.copySync().map { (collector, collectionScope, _) ->
// Switch to the scope of the downstream collector.
collectionScope.launch {
collector.emit(value)
}
}
jobs.joinAll()
}
private fun completeCollection() {
collectors.copySync().forEach { (_, _, cont) ->
cont.resume(Unit)
}
}
private fun failCollection(e: Throwable) {
collectors.copySync().forEach { (_, _, cont) ->
cont.resumeWithException(e)
}
}
}
private class PublishConnectableFlow<T>(upStream: Flow<T>) : ConnectableFlowImpl<T>(upStream)
private class SingleReplayConnectableFlow<T>(upStream: Flow<T>) : ConnectableFlowImpl<T>(upStream) {
private var buffer = AtomicReference<Optional<T>>(None)
override fun onCollect(collectionScope: CoroutineScope, collector: FlowCollector<in T>) {
collectionScope.launch {
when (val optional = buffer.get()) {
is Some -> collector.emit(optional.value)
}
}
}
override fun onEmit(value: T) {
buffer.set(Some(value))
}
override fun onCloseConnection(connectionScope: CoroutineScope) {
buffer.set(None)
}
}
private class ReplayConnectableFlow<T>(
upStream: Flow<T>,
private val bufferSize: Int
) : ConnectableFlowImpl<T>(upStream) {
private val buffer = CircularArray<T>(bufferSize)
override fun onCloseConnection(connectionScope: CoroutineScope) {
clearBuffer()
}
override fun onCollect(collectionScope: CoroutineScope, collector: FlowCollector<in T>) {
collectionScope.launch {
copyOfBuffer().forEach { collector.emit(it) }
}
}
override fun onEmit(value: T) {
addToBuffer(value)
}
private fun clearBuffer() = buffer.runSync { clear() }
private fun copyOfBuffer() = buffer.runSync { Array<Any?>(size(), ::get) } as Array<T>
private fun addToBuffer(value: T) = buffer.runSync {
if (size() == bufferSize) {
popFirst()
}
addLast(value)
}
}
private class RefCountFlow<T>(
private val upStream: ConnectableFlow<T>,
private val scope: CoroutineScope,
private val numberOfCollectors: Int
) : Flow<T> {
private var refCount = AtomicInteger(0)
private lateinit var connection: Connection
private val connectionMutex = Mutex()
override suspend fun collect(collector: FlowCollector<in T>) {
scope.launch {
connectionMutex.withLock {
if (refCount.incrementAndGet() == numberOfCollectors) {
connection = upStream.connect(scope)
}
}
}.join()
try {
upStream.collect(collector)
} finally {
scope.launch {
connectionMutex.withLock {
if (refCount.decrementAndGet() == 0) {
connection.close()
}
}
}.join()
}
}
}
private class AutoConnectFlow<T>(
private val upStream: ConnectableFlow<T>,
private val scope: CoroutineScope,
private val numberOfCollectors: Int
) : Flow<T> {
private var refCount = AtomicInteger(0)
private var connection: Connection? = null
private val connectionMutex = Mutex()
init {
if (numberOfCollectors <= 0) {
scope.launch {
connectionMutex.withLock {
connection = upStream.connect(this@launch)
}
}
}
}
override suspend fun collect(collector: FlowCollector<in T>) = coroutineScope {
val upStreamHasTerminated = scope.async {
connectionMutex.withLock {
val connection = if (refCount.incrementAndGet() == numberOfCollectors) {
upStream.connect(scope)
} else {
connection
}
this@AutoConnectFlow.connection = connection
connection != null && !connection.isConnected()
}
}.await()
if (!upStreamHasTerminated) {
upStream.collect(collector)
}
}
}
private class ConnectionImpl : Connection {
private var job: Job? = null
private val isJobActive get() = job?.isActive == true
private val jobMutex = Mutex()
override suspend fun isConnected(): Boolean {
return jobMutex.withLock { isJobActive }
}
override suspend fun close() {
jobMutex.withLock {
job?.cancelAndJoin()
job = null
}
}
fun CoroutineScope.fetchJob(getJob: suspend CoroutineScope.() -> Job) {
launch {
jobMutex.withLock {
if (!isJobActive) {
job = getJob()
}
}
}
}
}
private data class CollectorInfo<T>(
val collector: FlowCollector<in T>,
val scope: CoroutineScope,
val cont: CancellableContinuation<Unit>
)
private inline fun <A : Any, B> A.runSync(block: A.() -> B): B = synchronized(this) { block() }
private inline fun <A> List<A>.copySync() = runSync { toList() }
private sealed class Optional<out T>
private class Some<out T>(val value: T) : Optional<T>()
private object None : Optional<Nothing>()
package connectableflow
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flowViaChannel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.concurrent.thread
var startTime = 0L
// Situation:
// Resource that is somewhat or very expensive (depending on [cost]) to start and shutdown.
// Connecting to this resource is always expensive. Connections should be shared.
class ExpensiveResource(private val cost: Long) {
private val listeners = mutableListOf<(String) -> Unit>()
private val running = AtomicBoolean(false)
private var thread: Thread? = null
fun start() {
if (running.compareAndSet(false, true)) {
thread = thread {
println("Starting resource at ${(System.currentTimeMillis() - startTime)}")
// Mimic a (somewhat or very) expensive start-up process
Thread.sleep(cost)
while (running.get()) {
synchronized(listeners) { listeners.toList() }.forEach {
println("Sending data")
it((System.currentTimeMillis() - startTime).toString())
Thread.sleep(200)
}
}
println("Closing resource at ${(System.currentTimeMillis() - startTime)}")
// Mimic a (somewhat or very) expensive shut-down process
Thread.sleep(cost)
}
}
}
fun register(listener: (String) -> Unit) {
synchronized(listeners) { listeners += listener }
}
fun stop() {
if (running.compareAndSet(true, false)) {
thread?.join()
synchronized(listeners) { listeners.clear() }
}
}
}
fun ExpensiveResource.asFlow(): Flow<String> = flowViaChannel { channel ->
start()
register {
channel.offer(it)
}
channel.invokeOnClose {
stop()
}
}
class UseCasesForConnectableFlow {
fun manage_expensive_start_and_stop_of_resource() = runBlocking {
startTime = System.currentTimeMillis()
println("*** Start manage_expensive_start_and_stop_of_resource")
// Set up the cold flow to the Expensive-Resource (cost is high (1000))
val flow: ConnectableFlow<String> = ExpensiveResource(1000).asFlow().publish()
fun getExpensiveResourceFlow() = flow
fun startListening(listenerName: String): Job = launch {
getExpensiveResourceFlow().collect {
println("$listenerName received $it")
}
}
fun serviceStartsResource() = flow.connect(this)
suspend fun serviceStopsResource(connection: Connection) = connection.close()
delay(1000)
// Parties interested in the Expensive-Resource come online and start collecting (e.g UI/Screens).
// But don't let the starting of the Expensive-Resource be managed by how many
// parties are collecting its data, since this can be somewhat unpredictable, and we don't
// want to risk starting and stopping the Expensive-Resource too frequently.
// Manage it explicitly through some Service instead.
val job1 = startListening("Listener 1")
val job2 = startListening("Listener 2")
delay(2000)
// At this point in time, the Expensive-Resource has not yet been started.
// After a bit of time the Service finally comes online and starts the Expensive-Resource.
// Any listeners will now, after about 3 seconds, receive the data emitted
// from the Expensive-Resource.
val connection = serviceStartsResource()
delay(4000)
// Yet another interested party comes online and starts collecting
val job3 = startListening("Listener 3")
delay(2000)
// *All* interested parties stop listening/collecting for some reason.
// The Expensive-Resource keeps running, though; maybe some other interested party will
// appear soon...
job1.cancelAndJoin()
job2.cancelAndJoin()
job3.cancelAndJoin()
delay(2000)
// A fresh new interested party comes online and starts collecting.
startListening("Listener 4")
// After a bit of time, the Service no longer needs the Expensive-Resource.
// Any parties that are still listening will now stop collecting.
delay(2000)
serviceStopsResource(connection)
}
fun share_expensive_connection_to_resource() = runBlocking {
startTime = System.currentTimeMillis()
println("*** Start share_expensive_connection_to_resource")
// Set up the cold flow to the Resource and allow it to be shared.
// The Resource will start automatically when one or more parties start collecting.
// The Resource will stop automatically when no more parties are collecting.
val flow: Flow<String> = ExpensiveResource(10).asFlow().share(this)
fun getResourceFlow() = flow
fun startListening(listenerName: String): Job = launch {
getResourceFlow().collect {
println("$listenerName received $it")
}
}
delay(1000)
// Parties interested in the Resource come online and start collecting (eg. UI/Screens).
// The starting and stopping of the Resource itself is not that expensive, but
// a connection to it is expensive. Share the connection between the interested
// parties, but close the connection as soon as no-one is listening any more.
val job1 = startListening("Listener 1")
val job2 = startListening("Listener 2")
delay(6000)
// Yet another interested party comes online and starts collecting
val job3 = startListening("Listener 3")
delay(2000)
// *All* interested parties stop listening/collecting for some reason.
// The connection to the Resource will be closed and the Resource stops.
job1.cancelAndJoin()
job2.cancelAndJoin()
job3.cancelAndJoin()
delay(2000)
// A fresh new interested party comes online and starts collecting.
// The Resource starts again.
val job4 = startListening("Listener 4")
delay(2000)
// And the last party stops collecting.... The Resource stops again.
job4.cancelAndJoin()
}
}
fun main() {
val useCases = UseCasesForConnectableFlow()
// Examples of this use-case are connections to a database that is
// slow to spin up and may be slow to shutdown.
useCases.manage_expensive_start_and_stop_of_resource()
// Examples of this use-case are connecting to BLE GATT characteristics.
// Re-starting and stopping them is not *that* expensive, but the number
// of BLE connections should be minimized and shared.
useCases.share_expensive_connection_to_resource()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment