Skip to content

Instantly share code, notes, and snippets.

@Kondenko
Created June 15, 2019 20:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Kondenko/f8d12923e624fa097ac539b77139fb1c to your computer and use it in GitHub Desktop.
Save Kondenko/f8d12923e624fa097ac539b77139fb1c to your computer and use it in GitHub Desktop.
Kotlin Flow Extensions
@ExperimentalCoroutinesApi
fun <T> Flow<T>.bufferList(capacity: Int): Flow<List<T>> {
require(capacity > 0) { "Buffer size should be positive, but was $capacity" }
var buffer = listOf<T>()
return transform { value ->
if (buffer.size == capacity) emit(buffer)
else buffer = buffer + value
}
}
@InternalCoroutinesApi
@ExperimentalCoroutinesApi
suspend fun <T> Flow<T>.collectWhenAtLeast(
count: Int,
action: (List<T>) -> Unit
) {
require(count >= 0) { "Buffer size should be non-negative, but was $count" }
var buffer: List<T> = emptyList()
var useBuffer = true
collect { value ->
when {
!useBuffer -> action(listOf(value))
buffer.size == count -> {
action(buffer)
useBuffer = false
}
else -> buffer = buffer + value
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment