Skip to content

Instantly share code, notes, and snippets.

@gmk57
Last active February 13, 2024 15:17
Show Gist options
  • Star 29 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save gmk57/330a7d214f5d710811c6b5ce27ceedaa to your computer and use it in GitHub Desktop.
Save gmk57/330a7d214f5d710811c6b5ce27ceedaa to your computer and use it in GitHub Desktop.
Sending events to UI with Channel/Flow + custom collector (see my first comment for reasons behind it)
/**
* Starts collecting a flow when the lifecycle is started, and **cancels** the collection on stop.
* This is different from `lifecycleScope.launchWhenStarted { flow.collect{...} }`, in which case
* the coroutine is just suspended on stop.
*/
inline fun <reified T> Flow<T>.collectWhileStarted(
lifecycleOwner: LifecycleOwner,
noinline action: suspend (T) -> Unit
) {
object : DefaultLifecycleObserver {
private var job: Job? = null
init {
lifecycleOwner.lifecycle.addObserver(this)
}
override fun onStart(owner: LifecycleOwner) {
job = owner.lifecycleScope.launch {
collect { action(it) }
}
}
override fun onStop(owner: LifecycleOwner) {
job?.cancel()
job = null
}
}
}
class MyViewModel : ViewModel() {
// You can specify exact buffer size and onBufferOverflow strategy here, or go full blast with Channel.UNLIMITED
private val eventChannel = Channel<String>(capacity = 10, onBufferOverflow = BufferOverflow.DROP_OLDEST)
val eventFlow = eventChannel.receiveAsFlow()
fun sendEvent(element: String) = eventChannel.trySend(element) // `trySend` replaces `offer` since Coroutines 1.5
}
class MyFragment : Fragment(R.layout.fragment_my) {
private val viewModel: MyViewModel by viewModels()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
// You can collect flow in `onCreate` using `this` as lifecycleOwner
// This is a bit more efficient: `LifecycleObserver` is registered only once
viewModel.eventFlow.collectWhileStarted(this) { Log.i(TAG, "event: $it") }
}
override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
super.onViewCreated(view, savedInstanceState)
// Or you can collect flow in `onViewCreated` using `viewLifecycleOwner`
// This more closely resembles a typical LiveData observer
viewModel.eventFlow.collectWhileStarted(viewLifecycleOwner) { Log.i(TAG, "event: $it") }
}
}
@gmk57
Copy link
Author

gmk57 commented Feb 21, 2021

On Android, we sometimes need to process events from our ViewModel exactly once. This is the so-called SingleLiveEvent case.

Aforementioned article recommends to use LiveData with Event wrapper. It works, but has some limitations:

  1. If multiple events occur while observer is inactive (app in background, fragment in back stack, etc), only the last one is preserved.
  2. LiveData itself supports multicast, but Event can be consumed only once (= by only one observer).

Some third-party libraries emerged too. They support multiple observers, but at the cost of reduced reliability. One of them just drops events emitted during configuration change. The other one enqueues events in this case, but only the first subscriber receives them afterwards.

Then comes the "new hotness": Kotlin Flow, especially its recently presented hot flow flavors (pun intended). StateFlow is very similar to LiveData, while SharedFlow, which by default does not replay (resembling good old PublishSubject from RxJava), looks promising for sending events. However, it has the same issue with reliability: events sent while there is no subscribers (e.g. screen is being rotated) are immediately lost.

Roman Elizarov, father of Kotlin coroutines and flows, suggests to use unicast Channel.receiveAsFlow() for events to be processed exactly once (see "A use-case for channels" here). With this construct, "posted events are never dropped by default", he says. We can even use Channel(capacity = Channel.UNLIMITED) to be sure that buffer limits won't affect our events.

Unfortunately, there is a catch again. To be on the safe side (especially important for fragment transactions), we need to collect this Flow in Activity or Fragment only between onStart and onStop. Easiest way to do it is by lifecycleScope.launchWhenStarted { flow.collect {...} } extension function. But due to the quirks of its implementation, events will be lost if emitted when lifecycle is stopped, but not yet destroyed.

To achieve a complete victory we need a custom collector, which will unsubscribe on stop and subscribe again on start. Luckily, we can borrow one from this article by Patrick Steiger, despite the fact that he was solving a completely different problem with it. You see this Flow.collectWhileStarted() above, slightly shrunk by me. It conveniently has the same syntax as LiveData.observe().

But what about multicast?
It looks like multicast to arbitrary number of pausable and recreatable observers with "exactly once" semantics guarantee is fundamentally impossible. So if you really need to send the same event to N observers, you should create N channels and explicitly put N copies of your event in them (maybe using some collection of channels to avoid repetition).

Update:

This solution is not thread-safe, see details below.

@gmk57
Copy link
Author

gmk57 commented Feb 21, 2021

So at the end of the day, the main difference between this solution and LiveData<Event<>> is the ability to queue multiple events while in background.

Do you need it? It depends on your use case. Sometimes it is crucial. Just make sure your app is not overwhelmed by these queued events (e.g. multiple fragment transactions at once, 50 toasts in a row...). For example, calling findNavController().navigate(R.id.someAction) twice in quick succession will crash with "IllegalArgumentException: Navigation action/destination id/someAction cannot be found from the current destination"

In other cases you might be better off with at most one event: LiveData<Event<>> / StateFlow<Event<>> / Channel(capacity = 1). For example, if some network operation has failed in the background, and then automatic retry succeeded, you might want to clear queued error event before the user sees it.

Also please keep in mind that while this solution avoids some known pitfalls leading to lost events, it still can't guarantee their delivery: the app may be killed in background with all its queued events. This remark about undelivered elements in Channel also bothers me a little (I'm not sure if this can happen in our particular case).

@gmk57
Copy link
Author

gmk57 commented Feb 21, 2021

I've just discovered a nice article by Michael Ferguson, where the same idea is explained in much more detail. It's a pity I haven't seen it earlier.

@fergusonm
Copy link

fergusonm commented Feb 21, 2021

Thanks for the mention! It's neat to see others have the same struggles and come to the same conclusion independently!

@gmk57
Copy link
Author

gmk57 commented Feb 21, 2021

Well, not quite independently: it seems we both took inspiration from the same articles by Jose Alcérreca, Roman Elizarov and Patrick Steiger. ;))

@gmk57
Copy link
Author

gmk57 commented Feb 22, 2021

Michael recently brought up interesting question about handling buffer overflows. For some use cases limited buffer with DROP_OLDEST or DROP_LATEST strategy may be inappropriate, while Channel.UNLIMITED becomes a silent memory leak if we forget to implement an observer. Unfortunately, there seems to be no easy way to measure current buffer size, and even Channel.isFull is scheduled for removal. One possible approach would be to appoint Channel with a large (but still limited) buffer and BufferOverflow.SUSPEND strategy, and then report overflows (offer returning false) to some monitoring system for investigation.

@mochadwi
Copy link

mochadwi commented Mar 14, 2021

such a well written discovery :))) amazing work you've done (both of you and the article: by Jose Alcérreca, Roman Elizarov and Patrick Steiger.)

been struggling with lifecycle-aware conditions in our apps, and you guys solve most of our usecases :)))))

@gmk57 @fergusonm

@fergusonm
Copy link

Google came up with their own solution just yesterday: https://medium.com/androiddevelopers/a-safer-way-to-collect-flows-from-android-uis-23080b1f8bda

It will be included in lifecycle:lifecycle-runtime-ktx:2.4.0-alpha01

From a quick look it appears to be similar to our solution. It's a bit more flexible in that you can pass in specifically which lifecycle state you want to use. They also add details about using it with compose that I hadn't even considered.

@mochadwi
Copy link

Google came up with their own solution just yesterday: https://medium.com/androiddevelopers/a-safer-way-to-collect-flows-from-android-uis-23080b1f8bda

It will be included in lifecycle:lifecycle-runtime-ktx:2.4.0-alpha01

From a quick look it appears to be similar to our solution. It's a bit more flexible in that you can pass in specifically which lifecycle state you want to use. They also add details about using it with compose that I hadn't even considered.

awesome, thanks for the updates! it's official google will for now on use Kotlin solutions :)))

@gmk57
Copy link
Author

gmk57 commented Mar 28, 2021

@fergusonm Many thanks for heads-up! Google's solution looks a bit too verbose to me, I shared my thoughts in the comments. But everybody will use it anyway. ;)

@gmk57
Copy link
Author

gmk57 commented May 16, 2021

My earlier suspicions about events getting lost in Channels (last sentence here) turned out to be right: since Coroutines 1.4.0 sent events may be dropped due to receiver cancellation.

This does not happen when sendEvent is called from the main thread: in this case collector runs synchronously, before sender returns (thanks to Dispatchers.Main.immediate, inherited from LifecycleCoroutineScope).

But events sent from background thread may get lost. For example, put this to MyFragment.onPause:
GlobalScope.launch(Dispatchers.IO) { viewModel.sendEvent("onPause") }
and rotate the screen several times. Result is nondeterministic.

In principle we could use onUndeliveredElement callback to resend dropped items, but that messes up their ordering.

Bottom line: to be on the safe side, I would still use an Event wrapper, as Roman Elizarov had suggested. Multiple queued events can be supported by SharedFlow with replay:

val eventFlow = MutableSharedFlow<Event<String>>(replay = 10, onBufferOverflow = BufferOverflow.DROP_OLDEST)
fun sendEvent(element: String) = eventFlow.tryEmit(Event(element))
...
viewModel.eventFlow.collectIn(viewLifecycleOwner) { it.consume()?.let { Log.i(TAG, "event: $it") } }

DROP_OLDEST strategy ensures consistent behavior between "observer is suspended" (e.g. when collected in lifecycleScope.launchWhenStarted) and "no observer" cases (then old events are always dropped).

collectIn is from this suggestion, but specific collection technique does not make real difference here.

Bonus points:

  • secondary application-scoped observers (using Event.peek() instead of consume()) can be supported, e.g. for logging
  • more complex scenarios may be implemented with replayCache and subscriptionCount: clearing errors, checking when buffer is filled, posting notification if app is in background, etc

@fergusonm
Copy link

Wow, you're right. I wish I had paid more attention to that comment.

I had to see the behaviour for myself in the playground: https://pl.kotl.in/0K5ON5dXp

As you mentioned the event can get dropped due to receiver cancellation. It takes a few runs through that playground sample to prove it but it does happen.

Some results from the playground test:


// "Normal" operation
ViewModel Sending...
1 Receiving
1 Received: Test
1 isActive: true
Cancelling first scope
2 Receiving
Cancelling second scope
// "Retried" operation via onUnDeliveredElement callback
ViewModel Sending...
1 Receiving
Cancelling first scope
Undelievered: Test
ViewModel Sending again...
2 Receiving
2 Received: Test
2 isActive: true
Cancelling second scope
ViewModel Sent again!
// "Normal" operation received on in-active coroutine.
// This is an interesting result. Any cancellation cooperation
// eg, even just calling `delay` could potentially mean 
// an unprocessed event.
ViewModel Sending...
1 Receiving
Cancelling first scope
1 Received: Test
1 isActive: !!!!!!!!!!! false!
2 Receiving
Cancelling second scope

I feel like the last result adds yet another reason to going back to using an event wrapper. Though I'm not entirely certain what one should do when the event is "half" processed.

@gmk57
Copy link
Author

gmk57 commented May 16, 2021

Maybe this third result (event received when scope seems already cancelled) is related to calling firstFragmentScope.cancel() from background thread. AFAIK, on Android scope cancellation happens on the main thread. So if you collect on the main thread too, scope can't get cancelled from under you concurrently with receiving event. The main reason for dropping "atomic cancellation" for channels was to prevent issues like this one.

On the other hand, delay between consuming and processing the event brings the risk of cancellation in-between; this would be true for "event wrapper" too.

@erikhuizinga
Copy link

erikhuizinga commented Oct 8, 2021

It's unfortunate that undelivered elements are possible with channels. It leads to the situation that today we still have no good alternative to the single live event (SLE) pattern. Using a shared flow of consumable events (SFCE) is the only alternative, but it's not ideal.

(Of course this doesn't matter if you don't care about lost elements.)

The big difference between SLE and SFCE is that the former is a publisher that publishes to one subscriber only, while the latter publishes to all subscribers but the event carries the consumed state. While SFCE is better in terms of separation of concerns, it does mean that all subscribers will have to remember to try-to-consume-if-not-consumed. That is enforced by the Event wrapper, but it's boilerplate for the subscribers.

Wouldn't it be nice to have a very specific flow subclass that manages this for you? Not sure if possible, but it should:

  • Allow 0 or more subscribers (like a shared flow).
  • Emit to 1 subscriber only (unlike a shared flow):
    • If 0 subscribers: buffer with drop oldest strategy.
    • If 1+ subscribers: guarantee that exactly 1 subscriber receives the value.
  • Do no reemit an element even if the number of subscribers goes from 1+ to 0 and back to 1+.

Would something like that be possible? E.g. through an operator extension on SharedFlow. It would simplify the observers: just observe the element, not some wrapped value.

It wouldn't support your 'Bonus points', but that's fine: the event wrapper can still be used for these cases.

@erikhuizinga
Copy link

erikhuizinga commented Oct 8, 2021

Or maybe simpler: just expose the events like so (in my previous comment's simplified use case)?

private val _events = MutableSharedFlow<Event<E>>(/*..*/)
val events = _events.mapNotNull { it.unconsumedValueOrNull() }

@gmk57
Copy link
Author

gmk57 commented Oct 11, 2021

Thanks for the suggestion!

As far as I could test, mapNotNull works correctly. Personally, I find consuming inside a "shared" flow (technically it is not a SharedFlow after map) a bit counter-intuitive.

I'm not sure if consume()?.let { ... } is a boilerplate, I see it as a clear indication that we're dealing with events here, not some state. My main concern is that for Event<Unit> one can easily forget to add consume(), I've hit this a couple of times.

Having multiple primary (consuming) observers for a single flow is tricky, because it's hard to predict which one will process the event. Of course, if it doesn't matter, good for you. ;)

Nowadays, I prefer to expose a single StateFlow<SomeViewState> to UI, including all possible events. Event wrapper plays nice with it.

But all this is a matter of taste. If mapNotNull { it.consume() } looks and works well for you, great! :)

@erikhuizinga
Copy link

The fact that mapNotNull returns a Flow and not a SharedFlow makes sense, because after the map the property of a shared flow may no longer hold:

A hot Flow that shares emitted values among all its collectors in a broadcast fashion, so that all collectors get all emitted values.

(source)

In fact, a Flow might emit to zero or more collectors. That's not too generic, that's just versatile.

You can always create extensions for reuse, e.g.

fun <T : Any, V> SharedFlow<Event<T>>.mapNotNullEvents(transform: suspend (T) -> V) /* : Flow<V> */ =
    mapNotNull { it.unconsumedValueOrNull() }.map(transform)

fun <T : Any, V> SharedFlow<Event<T>>.mapEvents(transform: suspend (T) -> V) /* : Flow<V?> */ =
    map { it.unconsumedValueOrNull()?.let { value -> transform(value) } }

fun <T : Any> SharedFlow<Event<T>>.onEachEvent(action: suspend (T) -> Unit) /* : Flow<Event<T>> */ =
    onEach { it.unconsumedValueOrNull()?.let { value -> action(value) } }

It would've been nicer if onEach returned a SharedFlow instead of a Flow, as it preserves the broadcast property in the type signature.

@fergusonm
Copy link

So, for what it's worth Google's updated their own guidance on this problem yet again. https://developer.android.com/jetpack/guide/ui-layer/events (I'm not quite sure when this updated guide was posted.)

From their guide:

Note: In some apps, you might have seen ViewModel events being exposed to the UI using Kotlin Channels or other reactive streams. These solutions usually require workarounds such as event wrappers in order to guarantee that events are not lost and that they're consumed only once.

Requiring workarounds is an indication that there's a problem with these approaches. The problem with exposing events from the ViewModel is that it goes against the state-down-events-up principle of Unidirectional Data Flow.

The TL;DR of it is for the view to call back to the view model when an event has been processed. This is in-line with their recommendations for unidirectional data flow and feels very Compose-y. It ignores a lot of issues, in my opinion, but it is what it is.

@gmk57
Copy link
Author

gmk57 commented Dec 23, 2021

@fergusonm Thanks for the update, I haven't seen this article before.

It's good to see that Google is aware of the issue. I like UDF & try to use it as much as possible. From my point of view, the main problem is an "impedance mismatch" between state-down-events-up principle and event-driven classic Android UI.

Event wrappers and userMessageShown() are both workarounds for this mismatch, with the latter being much more verbose. Wrapper just encapsulates basically the same logic to avoid repetition.

Compose mostly alleviates this concern, e.g. AlertDialog is now just a composable and can be clearly modeled as part of state.

On the other side, AFAIK, Navigation Compose is still event-based. Is this code guaranteed to be called only once?

LaunchedEffect(viewModel.uiState)  {
    if (viewModel.uiState.isUserLoggedIn) {
        currentOnUserLogIn()
    }
}

Another point that bothers me is the suggestion to handle "UI behavior logic" directly in UI, bypassing ViewModel. This goes against the "single source of truth" principle. It also complicates handling configuration changes, which becomes a larger issue on Android 12.

BTW, one phrase caught my eye: "business logic remains the same for the same app on different mobile platforms". Is Google starting to push us towards KMP? ;)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment