Skip to content

Instantly share code, notes, and snippets.

@AWinterman
Last active September 13, 2023 11:12
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save AWinterman/8516d4869f491176ebb270dafbb23199 to your computer and use it in GitHub Desktop.
Save AWinterman/8516d4869f491176ebb270dafbb23199 to your computer and use it in GitHub Desktop.
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
private suspend fun <T> getChunk(channel: Channel<T>, maxChunkSize: Int): List<T> {
// suspend until there's an element in the buffer
val received = channel.receive()
// start a chunk
val chunk = mutableListOf(received)
// no more than chunk size will be retrieved
while (chunk.size < maxChunkSize) {
val polled = channel.poll()
if (polled == null) {
// then we've reached the end of the elements currently buffered.
return chunk
}
chunk.add(polled)
}
return chunk
}
/**
* [chunked] buffers a maximum of [maxSize] elements, preferring to emit early rather than wait if less than
* [maxSize]
*
* If [checkIntervalMillis] is specified, chunkedNaturally suspends [checkIntervalMillis] to allow the buffer to fill.
*/
fun <T> Flow<T>.chunked(maxSize: Int, checkIntervalMillis: Long = 0): Flow<List<T>> {
val buffer = Channel<T>(maxSize)
return channelFlow {
coroutineScope {
launch {
this@chunked.collect {
// `send` will suspend if [maxSize] elements are currently in buffer
buffer.send(it)
}
buffer.close()
}
launch {
while (!buffer.isClosedForReceive) {
val chunk = getChunk(buffer, maxSize)
this@channelFlow.send(chunk)
delay(checkIntervalMillis)
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment