Skip to content

Instantly share code, notes, and snippets.

@fluidsonic
Last active October 16, 2020 20:02
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fluidsonic/4330ebda9942c57a6660fadba1145362 to your computer and use it in GitHub Desktop.
Save fluidsonic/4330ebda9942c57a6660fadba1145362 to your computer and use it in GitHub Desktop.
Fast flow combination
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
private val undefined = Any()
internal fun <T> Iterable<Flow<T>>.combineLatest(): Flow<List<T>> =
combineLatest { it }
internal fun <T, R> Iterable<Flow<T>>.combineLatest(transform: suspend (value: List<T>) -> R): Flow<R> = flow {
class Update(@JvmField val index: Int, @JvmField val value: T)
val flows = toList()
var remainingCount = flows.size
var values: MutableList<Any?> = MutableList(remainingCount) { undefined }
channelFlow {
coroutineScope {
flows.forEachIndexed { index, flow ->
launch { flow.collectLatest { send(Update(index, it)) } }
}
}
}.collect { update ->
val index = update.index
val value = update.value
when {
remainingCount == 0 -> values = values.toMutableList()
values[index] === undefined -> remainingCount -= 1
}
values[index] = value
if (remainingCount == 0)
@Suppress("UNCHECKED_CAST")
emit(transform(values as List<T>))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment