Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Throttling flow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
fun <T> Flow<T>.throttleByTime(wait: Long): Flow<List<T>> = flow {
val holder = emptyList<T>().toMutableList()
var nextTime = 0L
collect {
val curTime = System.currentTimeMillis()
holder.add(it)
if (curTime >= nextTime) {
emit(holder.toList())
nextTime = curTime + wait
holder.clear()
}
}
}
fun <T> Flow<T>.throttleByCount(limit: Long): Flow<List<T>> = flow {
val holder = emptyList<T>().toMutableList()
var count = 0
collect {
count++
holder.add(it)
if (count >= limit) {
emit(holder.toList())
count = 0
holder.clear()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.