Skip to content

Instantly share code, notes, and snippets.

@attilakruchio
Last active September 13, 2023 18:32
Show Gist options
  • Save attilakruchio/726effb6a703953ddd45e36d6d8fb053 to your computer and use it in GitHub Desktop.
Save attilakruchio/726effb6a703953ddd45e36d6d8fb053 to your computer and use it in GitHub Desktop.
MutableSharedFlow vs. Unlimited Channel for one-shot events
package com.attila.kruchio.test
import android.os.Bundle
import android.util.Log
import androidx.appcompat.app.AppCompatActivity
import kotlin.coroutines.CoroutineContext
import kotlin.random.Random
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.launch
// TEST RESULTS:
// Dispatchers.Main.immediate:
// - SharedFlow: 256 received event
// - Channel: 500 received event (we got every event)
// Dispatchers.Main:
// - SharedFlow: 176 received event
// - Channel: 468 received event
// Dispatchers.IO:
// - SharedFlow: 222 received event
// - Channel: 496 received event
// Dispatchers.Default:
// - SharedFlow: 235 received event
// - Channel: 491 received event
//
// ALL-IN-ALL (just for statistics):
// - SharedFlow: 889/2000 (44%) (we lost 66% of the one-shot events with SharedFlow!!)
// - Channel: 1955/2000 (98%) (we lost only 2% of the one-shot events with Channel)
//
// CONCLUSION:
// For one-shot events, it's the safest way to use a Channel(Channel.UNLIMITED) and
// sending and receiving the one-shot events on Dispatchers.Main.immediate
// Note: If you collect the Channel flow in the lifecycleScope in either a Fragment
// or Activity, and send an event in the viewModelScope of the ViewModel, you are safe
// since both of these scopes are using Dispatchers.Main.immediate under-the-hood.
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
listOf(
Dispatchers.Default,
Dispatchers.IO,
Dispatchers.Main,
Dispatchers.Main.immediate
).forEach { dispatcher ->
val testRunner = FlowTestCaseRunner(
testCases = setOf(SharedFlowTestCase(), ChannelFlowTestCase()),
dispatcher = dispatcher
)
testRunner.run()
}
}
}
class FlowTestCaseRunner(
private val testCases: Set<FlowTestCase>,
private val dispatcher: CoroutineContext
) {
private val testScope = CoroutineScope(SupervisorJob() + dispatcher)
fun run() {
testScope.launch {
val results = testCases.map { async { it.name() to it.run() } }.awaitAll()
results.forEach { (name, result) ->
Log.d("Test results", "Dispatcher: $$dispatcher, Name: $name, Result: $result")
}
}
}
private suspend fun FlowTestCase.run(): Int {
val collectedElements = mutableListOf<Int>()
coroutineScope {
val flow = implementation()
// simulating event sending
launch {
repeat(500) {
send(it)
delay(Random.nextLong(10))
}
}
// simulating collector cancellation
launch {
repeat(1000) {
val receivingJob = launch {
flow.collect { collectedElements.add(it) }
}
// simulating an orientation change here
delay(Random.nextLong(10))
// Fragment or Activity enters onStop(), cancels the coroutine
// which was launched with repeatOnLifecycle()
receivingJob.cancelAndJoin()
// after some delay, it becomes active again and starts collecting the flow
delay(Random.nextLong(10))
}
}
}
return collectedElements.size
}
}
interface FlowTestCase {
fun name(): String
fun implementation(): Flow<Int>
suspend fun send(value: Int)
}
class SharedFlowTestCase : FlowTestCase {
private val underlyingFlow = MutableSharedFlow<Int>()
override fun name(): String = "SharedFlowTestCase"
override fun implementation(): Flow<Int> {
return underlyingFlow
}
override suspend fun send(value: Int) {
underlyingFlow.emit(value)
}
}
class ChannelFlowTestCase : FlowTestCase {
private val underlyingFlow = Channel<Int>(Channel.UNLIMITED)
override fun name(): String = "ChannelFlowTestCase"
override fun implementation(): Flow<Int> {
return underlyingFlow.receiveAsFlow()
}
override suspend fun send(value: Int) {
underlyingFlow.send(value)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment