Skip to content

Instantly share code, notes, and snippets.

@danielesegato
Last active April 30, 2024 18:59
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save danielesegato/160fabdcba5f563f1a171012377ea041 to your computer and use it in GitHub Desktop.
Save danielesegato/160fabdcba5f563f1a171012377ea041 to your computer and use it in GitHub Desktop.
Coroutine: Channel Shared Flow
/*
* Copyright 2022 Daniele Segato
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.awaitCancellation
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.launch
// source: https://gist.github.com/danielesegato/160fabdcba5f563f1a171012377ea041
/**
* A SharedFlow backed by a channel that hold onto elements until there's at least 1 subscriber
* to consume them.
*/
interface ChannelSharedFlow<T>: SharedFlow<T>, FlowCollector<T>
fun <T> ChannelSharedFlow(
scope: CoroutineScope,
replay: Int = 0,
bufferCapacity: Int = Channel.BUFFERED,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
onUndeliveredElement: ((T) -> Unit)? = null,
): ChannelSharedFlow<T> = ChannelSharedFlowImpl(
scope = scope,
replay = replay,
bufferCapacity = bufferCapacity,
onBufferOverflow = onBufferOverflow,
onUndeliveredElement = onUndeliveredElement,
)
private class ChannelSharedFlowImpl<T>(
scope: CoroutineScope,
replay: Int = 0,
bufferCapacity: Int = Channel.BUFFERED,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
onUndeliveredElement: ((T) -> Unit)? = null,
): ChannelSharedFlow<T> {
private val channel = Channel(
capacity = bufferCapacity,
onBufferOverflow = onBufferOverflow,
onUndeliveredElement = onUndeliveredElement,
)
private val shared = channel
.receiveAsFlow()
.shareIn(
scope = scope,
started = SharingStarted.WhileSubscribed(),
replay = replay,
)
init {
scope.launch {
try {
awaitCancellation()
} finally {
channel.close()
}
}
}
override suspend fun emit(value: T) {
channel.send(value)
}
override val replayCache: List<T> = shared.replayCache
override suspend fun collect(collector: FlowCollector<T>): Nothing {
shared.collect(collector)
}
}

Why this exist

SharedFlow, by default, drop elements and the buffer if there's no subscriber. This is the intended behavior of SharedFlow. Sometimes you need a SharedFlow that hold onto elements until some subscriber comes along and start collecting elements.

This SharedFlow implementation does just that.

Usage

// create the ChannelSharedFlow
val shared = ChannelSharedFlow<String>(
   scope = scope, // use the scope that makes sense for you
)

shared.emit("Hi") // suspend function

This SharedFlow will not consume until there's at least 1 subscriber to receive events. This means you'll not miss any events even if you have no subscriber for a while. And if there are multiple at the same time they'll all receive.

It uses a Channel under the hood to keep the buffer and build a SharedFlow on top of it using the given scope. When the scope is canceled the channel is also closed.

This snipped of code demonstrate it: https://pl.kotl.in/69JeANrYg

Install and Modifications

To install just copy ChannelSharedFlow.kt in your codebase, you can apply your own package but keep the License comment.

You are free to edit the code as per the Apache License 2.0 if you are in doubt on what you can do check this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment