Skip to content

Instantly share code, notes, and snippets.

@psteiger
Last active April 8, 2023 21:39
Show Gist options
  • Save psteiger/7c48daf5d669469ff0b9d28bb842d8fa to your computer and use it in GitHub Desktop.
Save psteiger/7c48daf5d669469ff0b9d28bb842d8fa to your computer and use it in GitHub Desktop.
Accumulating Kotlin Flow
/**
* Gates upstream flow emissions by [passThrough].
*
* When upstream flow emits a value, either emits the value downstream if the last value emitted by [passThrough]
* was `true`, or accumulates it for later emission if `false`. Accumulated values are emitted once [passThrough]
* emits `true`.
*/
@OptIn(FlowPreview::class) // produceIn is stable since kotlinx-coroutines 1.7.0-Beta
internal fun <T> Flow<T>.accumulateBy(passThrough: Flow<Boolean>) = flow<List<T>> {
coroutineScope {
val valueChannel = produceIn(this)
val passThroughFlow = passThrough.materializeCompletion().stateIn(this).dematerializeCompletion()
while (true) {
val firstValue =
valueChannel.receiveCatching().getOrNull() ?: break // null -> upstream value flow completed
val valuesToEmit = mutableListOf(firstValue)
passThroughFlow.firstOrNull { it } ?: error(
"Gate flow terminated with passThrough != true. Unsent elements: ${valuesToEmit + valueChannel.toList()}"
)
while (true) {
val value =
valueChannel.tryReceive().getOrNull() ?: break // null -> consumed all upstream values at this point in time
valuesToEmit.add(value)
}
emit(valuesToEmit.toList())
}
coroutineContext.cancelChildren()
}
}
private fun <T> Flow<T>.materializeCompletion(): Flow<ValueOrCompletion<T>> = flow {
val result = runCatching { collect { emit(Value(it)) } }
emit(Completion(result.exceptionOrNull()))
}
private fun <T> Flow<ValueOrCompletion<T>>.dematerializeCompletion(): Flow<T> = transformWhile { valueOrCompletion ->
when (valueOrCompletion) {
is Value -> {
emit(valueOrCompletion.value)
true
}
is Completion -> {
valueOrCompletion.throwable?.let { throw it }
false
}
}
}
private sealed interface ValueOrCompletion<out T>
private data class Value<T>(val value: T) : ValueOrCompletion<T>
private data class Completion(val throwable: Throwable?) : ValueOrCompletion<Nothing>
// test
@Test
fun testAccumulateBy() = runTest {
val passThroughFlow = flow {
emit(true)
delay(100)
emit(false)
delay(100)
emit(true)
}
val valueFlow = (1..10).asFlow().onEach { value ->
when (value) {
3, 5 -> {
delay(100)
runCurrent()
}
else -> {}
}
}
val result =
valueFlow.accumulateBy(passThroughFlow)
.flowOn(UnconfinedTestDispatcher(testScheduler))
.toList()
assertThat(result).isEqualTo(
listOf(listOf(1), listOf(2), listOf(3), listOf(4, 5), listOf(6), listOf(7), listOf(8, 9, 10))
)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment