-
-
Save gmk57/330a7d214f5d710811c6b5ce27ceedaa to your computer and use it in GitHub Desktop.
/** | |
* Starts collecting a flow when the lifecycle is started, and **cancels** the collection on stop. | |
* This is different from `lifecycleScope.launchWhenStarted { flow.collect{...} }`, in which case | |
* the coroutine is just suspended on stop. | |
*/ | |
inline fun <reified T> Flow<T>.collectWhileStarted( | |
lifecycleOwner: LifecycleOwner, | |
noinline action: suspend (T) -> Unit | |
) { | |
object : DefaultLifecycleObserver { | |
private var job: Job? = null | |
init { | |
lifecycleOwner.lifecycle.addObserver(this) | |
} | |
override fun onStart(owner: LifecycleOwner) { | |
job = owner.lifecycleScope.launch { | |
collect { action(it) } | |
} | |
} | |
override fun onStop(owner: LifecycleOwner) { | |
job?.cancel() | |
job = null | |
} | |
} | |
} | |
class MyViewModel : ViewModel() { | |
// You can specify exact buffer size and onBufferOverflow strategy here, or go full blast with Channel.UNLIMITED | |
private val eventChannel = Channel<String>(capacity = 10, onBufferOverflow = BufferOverflow.DROP_OLDEST) | |
val eventFlow = eventChannel.receiveAsFlow() | |
fun sendEvent(element: String) = eventChannel.trySend(element) // `trySend` replaces `offer` since Coroutines 1.5 | |
} | |
class MyFragment : Fragment(R.layout.fragment_my) { | |
private val viewModel: MyViewModel by viewModels() | |
override fun onCreate(savedInstanceState: Bundle?) { | |
super.onCreate(savedInstanceState) | |
// You can collect flow in `onCreate` using `this` as lifecycleOwner | |
// This is a bit more efficient: `LifecycleObserver` is registered only once | |
viewModel.eventFlow.collectWhileStarted(this) { Log.i(TAG, "event: $it") } | |
} | |
override fun onViewCreated(view: View, savedInstanceState: Bundle?) { | |
super.onViewCreated(view, savedInstanceState) | |
// Or you can collect flow in `onViewCreated` using `viewLifecycleOwner` | |
// This more closely resembles a typical LiveData observer | |
viewModel.eventFlow.collectWhileStarted(viewLifecycleOwner) { Log.i(TAG, "event: $it") } | |
} | |
} |
So at the end of the day, the main difference between this solution and LiveData<Event<>>
is the ability to queue multiple events while in background.
Do you need it? It depends on your use case. Sometimes it is crucial. Just make sure your app is not overwhelmed by these queued events (e.g. multiple fragment transactions at once, 50 toasts in a row...). For example, calling findNavController().navigate(R.id.someAction)
twice in quick succession will crash with "IllegalArgumentException: Navigation action/destination id/someAction cannot be found from the current destination"
In other cases you might be better off with at most one event: LiveData<Event<>>
/ StateFlow<Event<>>
/ Channel(capacity = 1)
. For example, if some network operation has failed in the background, and then automatic retry succeeded, you might want to clear queued error event before the user sees it.
Also please keep in mind that while this solution avoids some known pitfalls leading to lost events, it still can't guarantee their delivery: the app may be killed in background with all its queued events. This remark about undelivered elements in Channel also bothers me a little (I'm not sure if this can happen in our particular case).
I've just discovered a nice article by Michael Ferguson, where the same idea is explained in much more detail. It's a pity I haven't seen it earlier.
Thanks for the mention! It's neat to see others have the same struggles and come to the same conclusion independently!
Well, not quite independently: it seems we both took inspiration from the same articles by Jose Alcérreca, Roman Elizarov and Patrick Steiger. ;))
Michael recently brought up interesting question about handling buffer overflows. For some use cases limited buffer with DROP_OLDEST
or DROP_LATEST
strategy may be inappropriate, while Channel.UNLIMITED
becomes a silent memory leak if we forget to implement an observer. Unfortunately, there seems to be no easy way to measure current buffer size, and even Channel.isFull
is scheduled for removal. One possible approach would be to appoint Channel
with a large (but still limited) buffer and BufferOverflow.SUSPEND
strategy, and then report overflows (offer
returning false
) to some monitoring system for investigation.
such a well written discovery :))) amazing work you've done (both of you and the article: by Jose Alcérreca, Roman Elizarov and Patrick Steiger.)
been struggling with lifecycle-aware
conditions in our apps, and you guys solve most of our usecases :)))))
Google came up with their own solution just yesterday: https://medium.com/androiddevelopers/a-safer-way-to-collect-flows-from-android-uis-23080b1f8bda
It will be included in lifecycle:lifecycle-runtime-ktx:2.4.0-alpha01
From a quick look it appears to be similar to our solution. It's a bit more flexible in that you can pass in specifically which lifecycle state you want to use. They also add details about using it with compose that I hadn't even considered.
Google came up with their own solution just yesterday: https://medium.com/androiddevelopers/a-safer-way-to-collect-flows-from-android-uis-23080b1f8bda
It will be included in
lifecycle:lifecycle-runtime-ktx:2.4.0-alpha01
From a quick look it appears to be similar to our solution. It's a bit more flexible in that you can pass in specifically which lifecycle state you want to use. They also add details about using it with compose that I hadn't even considered.
awesome, thanks for the updates! it's official google will for now on use Kotlin solutions :)))
@fergusonm Many thanks for heads-up! Google's solution looks a bit too verbose to me, I shared my thoughts in the comments. But everybody will use it anyway. ;)
My earlier suspicions about events getting lost in Channels (last sentence here) turned out to be right: since Coroutines 1.4.0 sent events may be dropped due to receiver cancellation.
This does not happen when sendEvent
is called from the main thread: in this case collector runs synchronously, before sender returns (thanks to Dispatchers.Main.immediate
, inherited from LifecycleCoroutineScope
).
But events sent from background thread may get lost. For example, put this to MyFragment.onPause
:
GlobalScope.launch(Dispatchers.IO) { viewModel.sendEvent("onPause") }
and rotate the screen several times. Result is nondeterministic.
In principle we could use onUndeliveredElement
callback to resend dropped items, but that messes up their ordering.
Bottom line: to be on the safe side, I would still use an Event wrapper, as Roman Elizarov had suggested. Multiple queued events can be supported by SharedFlow with replay:
val eventFlow = MutableSharedFlow<Event<String>>(replay = 10, onBufferOverflow = BufferOverflow.DROP_OLDEST)
fun sendEvent(element: String) = eventFlow.tryEmit(Event(element))
...
viewModel.eventFlow.collectIn(viewLifecycleOwner) { it.consume()?.let { Log.i(TAG, "event: $it") } }
DROP_OLDEST
strategy ensures consistent behavior between "observer is suspended" (e.g. when collected in lifecycleScope.launchWhenStarted
) and "no observer" cases (then old events are always dropped).
collectIn
is from this suggestion, but specific collection technique does not make real difference here.
Bonus points:
- secondary application-scoped observers (using
Event.peek()
instead ofconsume()
) can be supported, e.g. for logging - more complex scenarios may be implemented with
replayCache
andsubscriptionCount
: clearing errors, checking when buffer is filled, posting notification if app is in background, etc
Wow, you're right. I wish I had paid more attention to that comment.
I had to see the behaviour for myself in the playground: https://pl.kotl.in/0K5ON5dXp
As you mentioned the event can get dropped due to receiver cancellation. It takes a few runs through that playground sample to prove it but it does happen.
Some results from the playground test:
// "Normal" operation
ViewModel Sending...
1 Receiving
1 Received: Test
1 isActive: true
Cancelling first scope
2 Receiving
Cancelling second scope
// "Retried" operation via onUnDeliveredElement callback
ViewModel Sending...
1 Receiving
Cancelling first scope
Undelievered: Test
ViewModel Sending again...
2 Receiving
2 Received: Test
2 isActive: true
Cancelling second scope
ViewModel Sent again!
// "Normal" operation received on in-active coroutine.
// This is an interesting result. Any cancellation cooperation
// eg, even just calling `delay` could potentially mean
// an unprocessed event.
ViewModel Sending...
1 Receiving
Cancelling first scope
1 Received: Test
1 isActive: !!!!!!!!!!! false!
2 Receiving
Cancelling second scope
I feel like the last result adds yet another reason to going back to using an event wrapper. Though I'm not entirely certain what one should do when the event is "half" processed.
Maybe this third result (event received when scope seems already cancelled) is related to calling firstFragmentScope.cancel()
from background thread. AFAIK, on Android scope cancellation happens on the main thread. So if you collect
on the main thread too, scope can't get cancelled from under you concurrently with receiving event. The main reason for dropping "atomic cancellation" for channels was to prevent issues like this one.
On the other hand, delay
between consuming and processing the event brings the risk of cancellation in-between; this would be true for "event wrapper" too.
It's unfortunate that undelivered elements are possible with channels. It leads to the situation that today we still have no good alternative to the single live event (SLE) pattern. Using a shared flow of consumable events (SFCE) is the only alternative, but it's not ideal.
(Of course this doesn't matter if you don't care about lost elements.)
The big difference between SLE and SFCE is that the former is a publisher that publishes to one subscriber only, while the latter publishes to all subscribers but the event carries the consumed state. While SFCE is better in terms of separation of concerns, it does mean that all subscribers will have to remember to try-to-consume-if-not-consumed. That is enforced by the Event
wrapper, but it's boilerplate for the subscribers.
Wouldn't it be nice to have a very specific flow subclass that manages this for you? Not sure if possible, but it should:
- Allow 0 or more subscribers (like a shared flow).
- Emit to 1 subscriber only (unlike a shared flow):
- If 0 subscribers: buffer with drop oldest strategy.
- If 1+ subscribers: guarantee that exactly 1 subscriber receives the value.
- Do no reemit an element even if the number of subscribers goes from 1+ to 0 and back to 1+.
Would something like that be possible? E.g. through an operator extension on SharedFlow
. It would simplify the observers: just observe the element, not some wrapped value.
It wouldn't support your 'Bonus points', but that's fine: the event wrapper can still be used for these cases.
Or maybe simpler: just expose the events like so (in my previous comment's simplified use case)?
private val _events = MutableSharedFlow<Event<E>>(/*..*/)
val events = _events.mapNotNull { it.unconsumedValueOrNull() }
Thanks for the suggestion!
As far as I could test, mapNotNull
works correctly. Personally, I find consuming inside a "shared" flow (technically it is not a SharedFlow
after map
) a bit counter-intuitive.
I'm not sure if consume()?.let { ... }
is a boilerplate, I see it as a clear indication that we're dealing with events here, not some state. My main concern is that for Event<Unit>
one can easily forget to add consume()
, I've hit this a couple of times.
Having multiple primary (consuming) observers for a single flow is tricky, because it's hard to predict which one will process the event. Of course, if it doesn't matter, good for you. ;)
Nowadays, I prefer to expose a single StateFlow<SomeViewState>
to UI, including all possible events. Event wrapper plays nice with it.
But all this is a matter of taste. If mapNotNull { it.consume() }
looks and works well for you, great! :)
The fact that mapNotNull
returns a Flow
and not a SharedFlow
makes sense, because after the map the property of a shared flow may no longer hold:
A hot
Flow
that shares emitted values among all its collectors in a broadcast fashion, so that all collectors get all emitted values.
(source)
In fact, a Flow
might emit to zero or more collectors. That's not too generic, that's just versatile.
You can always create extensions for reuse, e.g.
fun <T : Any, V> SharedFlow<Event<T>>.mapNotNullEvents(transform: suspend (T) -> V) /* : Flow<V> */ =
mapNotNull { it.unconsumedValueOrNull() }.map(transform)
fun <T : Any, V> SharedFlow<Event<T>>.mapEvents(transform: suspend (T) -> V) /* : Flow<V?> */ =
map { it.unconsumedValueOrNull()?.let { value -> transform(value) } }
fun <T : Any> SharedFlow<Event<T>>.onEachEvent(action: suspend (T) -> Unit) /* : Flow<Event<T>> */ =
onEach { it.unconsumedValueOrNull()?.let { value -> action(value) } }
It would've been nicer if onEach
returned a SharedFlow
instead of a Flow
, as it preserves the broadcast property in the type signature.
So, for what it's worth Google's updated their own guidance on this problem yet again. https://developer.android.com/jetpack/guide/ui-layer/events (I'm not quite sure when this updated guide was posted.)
From their guide:
Note: In some apps, you might have seen ViewModel events being exposed to the UI using Kotlin Channels or other reactive streams. These solutions usually require workarounds such as event wrappers in order to guarantee that events are not lost and that they're consumed only once.
Requiring workarounds is an indication that there's a problem with these approaches. The problem with exposing events from the ViewModel is that it goes against the state-down-events-up principle of Unidirectional Data Flow.
The TL;DR of it is for the view to call back to the view model when an event has been processed. This is in-line with their recommendations for unidirectional data flow and feels very Compose-y. It ignores a lot of issues, in my opinion, but it is what it is.
@fergusonm Thanks for the update, I haven't seen this article before.
It's good to see that Google is aware of the issue. I like UDF & try to use it as much as possible. From my point of view, the main problem is an "impedance mismatch" between state-down-events-up principle and event-driven classic Android UI.
Event wrappers and userMessageShown()
are both workarounds for this mismatch, with the latter being much more verbose. Wrapper just encapsulates basically the same logic to avoid repetition.
Compose mostly alleviates this concern, e.g. AlertDialog is now just a composable and can be clearly modeled as part of state.
On the other side, AFAIK, Navigation Compose is still event-based. Is this code guaranteed to be called only once?
LaunchedEffect(viewModel.uiState) {
if (viewModel.uiState.isUserLoggedIn) {
currentOnUserLogIn()
}
}
Another point that bothers me is the suggestion to handle "UI behavior logic" directly in UI, bypassing ViewModel. This goes against the "single source of truth" principle. It also complicates handling configuration changes, which becomes a larger issue on Android 12.
BTW, one phrase caught my eye: "business logic remains the same for the same app on different mobile platforms". Is Google starting to push us towards KMP? ;)
BTW, one phrase caught my eye: "business logic remains the same for the same app on different mobile platforms". Is Google starting to push us towards KMP? ;)
In 2024, your expectations are materializing.
On Android, we sometimes need to process events from our ViewModel exactly once. This is the so-called SingleLiveEvent case.
Aforementioned article recommends to use LiveData with Event wrapper. It works, but has some limitations:
Some third-party libraries emerged too. They support multiple observers, but at the cost of reduced reliability. One of them just drops events emitted during configuration change. The other one enqueues events in this case, but only the first subscriber receives them afterwards.
Then comes the "new hotness": Kotlin Flow, especially its recently presented hot flow flavors (pun intended). StateFlow is very similar to LiveData, while SharedFlow, which by default does not replay (resembling good old PublishSubject from RxJava), looks promising for sending events. However, it has the same issue with reliability: events sent while there is no subscribers (e.g. screen is being rotated) are immediately lost.
Roman Elizarov, father of Kotlin coroutines and flows, suggests to use unicast
Channel.receiveAsFlow()
for events to be processed exactly once (see "A use-case for channels" here). With this construct, "posted events are never dropped by default", he says. We can even useChannel(capacity = Channel.UNLIMITED)
to be sure that buffer limits won't affect our events.Unfortunately, there is a catch again. To be on the safe side (especially important for fragment transactions), we need to collect this Flow in Activity or Fragment only between
onStart
andonStop
. Easiest way to do it is bylifecycleScope.launchWhenStarted { flow.collect {...} }
extension function. But due to the quirks of its implementation, events will be lost if emitted when lifecycle is stopped, but not yet destroyed.To achieve a complete victory we need a custom collector, which will unsubscribe on stop and subscribe again on start. Luckily, we can borrow one from this article by Patrick Steiger, despite the fact that he was solving a completely different problem with it. You see this
Flow.collectWhileStarted()
above, slightly shrunk by me. It conveniently has the same syntax asLiveData.observe()
.But what about multicast?
It looks like multicast to arbitrary number of pausable and recreatable observers with "exactly once" semantics guarantee is fundamentally impossible. So if you really need to send the same event to N observers, you should create N channels and explicitly put N copies of your event in them (maybe using some collection of channels to avoid repetition).
Update:
This solution is not thread-safe, see details below.