Last active
September 13, 2023 18:32
-
-
Save attilakruchio/726effb6a703953ddd45e36d6d8fb053 to your computer and use it in GitHub Desktop.
MutableSharedFlow vs. Unlimited Channel for one-shot events
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.attila.kruchio.test | |
import android.os.Bundle | |
import android.util.Log | |
import androidx.appcompat.app.AppCompatActivity | |
import kotlin.coroutines.CoroutineContext | |
import kotlin.random.Random | |
import kotlinx.coroutines.CoroutineScope | |
import kotlinx.coroutines.Dispatchers | |
import kotlinx.coroutines.SupervisorJob | |
import kotlinx.coroutines.async | |
import kotlinx.coroutines.awaitAll | |
import kotlinx.coroutines.cancelAndJoin | |
import kotlinx.coroutines.channels.Channel | |
import kotlinx.coroutines.coroutineScope | |
import kotlinx.coroutines.delay | |
import kotlinx.coroutines.flow.Flow | |
import kotlinx.coroutines.flow.MutableSharedFlow | |
import kotlinx.coroutines.flow.receiveAsFlow | |
import kotlinx.coroutines.launch | |
// TEST RESULTS: | |
// Dispatchers.Main.immediate: | |
// - SharedFlow: 256 received event | |
// - Channel: 500 received event (we got every event) | |
// Dispatchers.Main: | |
// - SharedFlow: 176 received event | |
// - Channel: 468 received event | |
// Dispatchers.IO: | |
// - SharedFlow: 222 received event | |
// - Channel: 496 received event | |
// Dispatchers.Default: | |
// - SharedFlow: 235 received event | |
// - Channel: 491 received event | |
// | |
// ALL-IN-ALL (just for statistics): | |
// - SharedFlow: 889/2000 (44%) (we lost 66% of the one-shot events with SharedFlow!!) | |
// - Channel: 1955/2000 (98%) (we lost only 2% of the one-shot events with Channel) | |
// | |
// CONCLUSION: | |
// For one-shot events, it's the safest way to use a Channel(Channel.UNLIMITED) and | |
// sending and receiving the one-shot events on Dispatchers.Main.immediate | |
// Note: If you collect the Channel flow in the lifecycleScope in either a Fragment | |
// or Activity, and send an event in the viewModelScope of the ViewModel, you are safe | |
// since both of these scopes are using Dispatchers.Main.immediate under-the-hood. | |
class MainActivity : AppCompatActivity() { | |
override fun onCreate(savedInstanceState: Bundle?) { | |
super.onCreate(savedInstanceState) | |
listOf( | |
Dispatchers.Default, | |
Dispatchers.IO, | |
Dispatchers.Main, | |
Dispatchers.Main.immediate | |
).forEach { dispatcher -> | |
val testRunner = FlowTestCaseRunner( | |
testCases = setOf(SharedFlowTestCase(), ChannelFlowTestCase()), | |
dispatcher = dispatcher | |
) | |
testRunner.run() | |
} | |
} | |
} | |
class FlowTestCaseRunner( | |
private val testCases: Set<FlowTestCase>, | |
private val dispatcher: CoroutineContext | |
) { | |
private val testScope = CoroutineScope(SupervisorJob() + dispatcher) | |
fun run() { | |
testScope.launch { | |
val results = testCases.map { async { it.name() to it.run() } }.awaitAll() | |
results.forEach { (name, result) -> | |
Log.d("Test results", "Dispatcher: $$dispatcher, Name: $name, Result: $result") | |
} | |
} | |
} | |
private suspend fun FlowTestCase.run(): Int { | |
val collectedElements = mutableListOf<Int>() | |
coroutineScope { | |
val flow = implementation() | |
// simulating event sending | |
launch { | |
repeat(500) { | |
send(it) | |
delay(Random.nextLong(10)) | |
} | |
} | |
// simulating collector cancellation | |
launch { | |
repeat(1000) { | |
val receivingJob = launch { | |
flow.collect { collectedElements.add(it) } | |
} | |
// simulating an orientation change here | |
delay(Random.nextLong(10)) | |
// Fragment or Activity enters onStop(), cancels the coroutine | |
// which was launched with repeatOnLifecycle() | |
receivingJob.cancelAndJoin() | |
// after some delay, it becomes active again and starts collecting the flow | |
delay(Random.nextLong(10)) | |
} | |
} | |
} | |
return collectedElements.size | |
} | |
} | |
interface FlowTestCase { | |
fun name(): String | |
fun implementation(): Flow<Int> | |
suspend fun send(value: Int) | |
} | |
class SharedFlowTestCase : FlowTestCase { | |
private val underlyingFlow = MutableSharedFlow<Int>() | |
override fun name(): String = "SharedFlowTestCase" | |
override fun implementation(): Flow<Int> { | |
return underlyingFlow | |
} | |
override suspend fun send(value: Int) { | |
underlyingFlow.emit(value) | |
} | |
} | |
class ChannelFlowTestCase : FlowTestCase { | |
private val underlyingFlow = Channel<Int>(Channel.UNLIMITED) | |
override fun name(): String = "ChannelFlowTestCase" | |
override fun implementation(): Flow<Int> { | |
return underlyingFlow.receiveAsFlow() | |
} | |
override suspend fun send(value: Int) { | |
underlyingFlow.send(value) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment