Skip to content

Instantly share code, notes, and snippets.

@elizarov
Created December 26, 2019 10:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save elizarov/cc0221747798a111a4576931f98def05 to your computer and use it in GitHub Desktop.
Save elizarov/cc0221747798a111a4576931f98def05 to your computer and use it in GitHub Desktop.
Flow.withState Design

Introduction

There's a whole set of potential Flow operators that maintain some kind of state for each active collection. For example, Flow.withIndex and conceptually similar operators.

Proposal

The proposal is to introduce a generic withState operator to simplify implementation for all of them:

fun <T, R> Flow<T>.withState(builder: suspend FlowScope<T>.() -> Flow<R>): Flow<R>

where FlowScope<T> is an interface extending both CoroutineScope and Flow<T>.

The specified builder operator is called for each collection of the flow and can capture and use any kind of state specific to each collection instance in any kind of flow transformation. The scope of the builder is active while the flow collection is active.

Use-cases

For example, withIndex can be implemented via withState in the following way:

fun <T> Flow<T>.withIndex(): Flow<IndexedValue<T>> = withState {
    var index = 0
    map { value -> IndexedValue(index++, value) }
}

Other similar operators can be written, for example we can measure it took each element from the start:

fun <T> Flow<T>.withTime(): Flow<TimedValue<T>> = withState {
    val start = MonoClock.markNow()
    map { value -> TimedValue(value, start.elapsedNow()) }
}

However, this is not what makes the withState operation really powerfull. It provides ability to maintain complex state, perform suspending operations, and launch helper coorutines.

Complex state

Complex state can be maintained inside withState builder block and any flow transformation operators can use the enclosed state. For example, we can easily compute and average time it took each item to be emitted and inject it at the end of the stream:

sealed class Metered<out T> {
    data class Value<T>(val value: T) : Metered<T>()
    data class Average(val duration: Duration) : Metered<Nothing>()
}

fun <T> Flow<T>.metered(): Flow<Metered<T>> = withState {
    val start = MonoClock.markNow()
    var count = 0
    map {
        count++
        Metered.Value(it) as Metered<T>
    }.onCompletion {
        emit(Metered.Average(start.elapsedNow() / count))
    }
}

Suspending operations

For example, some configuration can be loaded from remote system on each collection of the flow:

flow.withState {
    val configuraiton = fetchConfigurationFromDB() // suspending
    map { value ->
        computeSomething(value, configuraiton)
    } 
}

Helper coroutines

withState operator provides a CoroutineScope that can be used to launch helper coroutines that are active only in the scope of the active flow and are all cancelled when flow completes. This automatic cancellation is designed to aid in gathering additional state that is needed only if the flow is being collection but is not needed otherwise (so it is not designed to side-effecting coroutines). For example, the above example with configuraion can be improved so that we don't wait for configuration to be fetched before starting to collect the upstream flow and cancel the featch from the database if the upstream flow does not emit anything and completes:

flow.withState {
    val configuraiton = async { fetchConfigurationFromDB() } // helper loader
    map { value ->
        computeSomething(value, configuraiton.await()) // only await when needed
    } 
}

Alternatives

In fact, it is not very hard to write these kinds of operators even without withState. Simple transfromations can use flow { ... } builder. For example:

fun <T> Flow<T>.withTime(): Flow<TimedValue<T>> = flow {
    val start = MonoClock.markNow()
    collect { value -> emit(TimedValue(value, start.elapsedNow())) }
}

It has collect { value -> emit(....) } boilerplate but it is not excessively verbose.

Alternatively, the following implementation does not repeat the implementation of map operator:

fun <T> Flow<T>.withTime(): Flow<TimedValue<T>> = flow {
    val start = MonoClock.markNow()
    emitAll(map { value -> TimedValue(value, start.elapsedNow()) })
}

This approach is also possible with operators like metered:

fun <T> Flow<T>.metered(): Flow<Metered<T>> = flow {
    val start = MonoClock.markNow()
    var count = 0
    emitAll(map {
        count++
        Metered.Value(it) as Metered<T>
    }.onCompletion {
        emit(Metered.Average(start.elapsedNow() / count))
    })
}

However, this apporach works well only in the definition of a new operator and does not chain well into the sequence of flow transformations.

There does not seem to be concise alternative with helper coroutines and management of their cancellation is tenuous to write manually, which makes it the most compelling use-case.

most compelling use-case for

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment