There's a whole set of potential Flow
operators that maintain some kind of state for each active collection.
For example, Flow.withIndex
and conceptually similar operators.
The proposal is to introduce a generic withState
operator to simplify implementation for all of them:
fun <T, R> Flow<T>.withState(builder: suspend FlowScope<T>.() -> Flow<R>): Flow<R>
where FlowScope<T>
is an interface extending both CoroutineScope
and Flow<T>
.
The specified builder
operator is called for each collection of the flow and can capture and use any
kind of state specific to each collection instance in any kind of flow transformation. The scope of
the builder is active while the flow collection is active.
For example, withIndex
can be implemented via withState
in the following way:
fun <T> Flow<T>.withIndex(): Flow<IndexedValue<T>> = withState {
var index = 0
map { value -> IndexedValue(index++, value) }
}
Other similar operators can be written, for example we can measure it took each element from the start:
fun <T> Flow<T>.withTime(): Flow<TimedValue<T>> = withState {
val start = MonoClock.markNow()
map { value -> TimedValue(value, start.elapsedNow()) }
}
However, this is not what makes the withState
operation really powerfull. It provides
ability to maintain complex state, perform suspending operations, and launch helper coorutines.
Complex state can be maintained inside withState
builder block and any flow
transformation operators can use the enclosed state.
For example, we can easily compute and average time it took each item to be
emitted and inject it at the end of the stream:
sealed class Metered<out T> {
data class Value<T>(val value: T) : Metered<T>()
data class Average(val duration: Duration) : Metered<Nothing>()
}
fun <T> Flow<T>.metered(): Flow<Metered<T>> = withState {
val start = MonoClock.markNow()
var count = 0
map {
count++
Metered.Value(it) as Metered<T>
}.onCompletion {
emit(Metered.Average(start.elapsedNow() / count))
}
}
For example, some configuration can be loaded from remote system on each collection of the flow:
flow.withState {
val configuraiton = fetchConfigurationFromDB() // suspending
map { value ->
computeSomething(value, configuraiton)
}
}
withState
operator provides a CoroutineScope
that can be used to launch helper coroutines that
are active only in the scope of the active flow and are all cancelled when flow completes. This
automatic cancellation is designed to aid in gathering additional state that is needed only
if the flow is being collection but is not needed otherwise (so it is not designed to side-effecting coroutines).
For example, the above example with configuraion can be improved so that we don't wait for
configuration to be fetched before starting to collect the upstream flow and cancel the featch
from the database if the upstream flow does not emit anything and completes:
flow.withState {
val configuraiton = async { fetchConfigurationFromDB() } // helper loader
map { value ->
computeSomething(value, configuraiton.await()) // only await when needed
}
}
In fact, it is not very hard to write these kinds of operators even without withState
.
Simple transfromations can use flow { ... }
builder. For example:
fun <T> Flow<T>.withTime(): Flow<TimedValue<T>> = flow {
val start = MonoClock.markNow()
collect { value -> emit(TimedValue(value, start.elapsedNow())) }
}
It has
collect { value -> emit(....) }
boilerplate but it is not excessively verbose.
Alternatively, the following implementation does not repeat the implementation of map
operator:
fun <T> Flow<T>.withTime(): Flow<TimedValue<T>> = flow {
val start = MonoClock.markNow()
emitAll(map { value -> TimedValue(value, start.elapsedNow()) })
}
This approach is also possible with operators like metered
:
fun <T> Flow<T>.metered(): Flow<Metered<T>> = flow {
val start = MonoClock.markNow()
var count = 0
emitAll(map {
count++
Metered.Value(it) as Metered<T>
}.onCompletion {
emit(Metered.Average(start.elapsedNow() / count))
})
}
However, this apporach works well only in the definition of a new operator and does not chain well into the sequence of flow transformations.
There does not seem to be concise alternative with helper coroutines and management of their cancellation is tenuous to write manually, which makes it the most compelling use-case.
most compelling use-case for