Skip to content

Instantly share code, notes, and snippets.

@octylFractal
Created November 10, 2020 05:40
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save octylFractal/e95df0727354b9d6b76f2ede3cae8b89 to your computer and use it in GitHub Desktop.
/**
* Buffer either up to [count] items or as long as [maxDuration] since the first item,
* then emit a list containing those items. Never emits an empty list.
*/
@OptIn(ObsoleteCoroutinesApi::class)
fun <T> Flow<T>.bufferChunks(count: Int, maxDuration: Duration): Flow<List<T>> {
return flow {
coroutineScope {
val flowScope = this
val emitter = actor<BufferSignal<T>> {
var emitJob: Job? = null
var bufferCount = 0
val buffer = ArrayList<T>(count)
suspend fun doEmitBuffer() {
emitJob?.cancel()
emitJob = null
emit(buffer.toList())
buffer.clear()
bufferCount++
}
for (signal in channel) {
exhaustive(when (signal) {
is BufferSignal.Add -> {
buffer.add(signal.item)
if (buffer.size == 1) {
val countAtStart = bufferCount
// setup job to emit in given duration
emitJob = flowScope.launch {
delay(maxDuration)
channel.send(BufferSignal.Emit(countAtStart))
}
} else if (buffer.size == count) {
doEmitBuffer()
}
Unit
}
is BufferSignal.Emit -> {
if (signal.count != null && signal.count != bufferCount) {
// we have already emitted this buffer!
continue
}
if (buffer.isEmpty()) {
// there's nothing to emit
continue
}
doEmitBuffer()
}
})
}
if (buffer.isNotEmpty()) {
doEmitBuffer()
}
}
this@bufferChunks.collect {
emitter.send(BufferSignal.Add(it))
}
emitter.close()
}
}
}
private sealed class BufferSignal<T> {
class Emit<T>(val count: Int? = null) : BufferSignal<T>()
data class Add<T>(val item: T) : BufferSignal<T>()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment