/BufferChunk.kt Secret
Created
November 10, 2020 05:40
Star
You must be signed in to star a gist
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Buffer either up to [count] items or as long as [maxDuration] since the first item, | |
* then emit a list containing those items. Never emits an empty list. | |
*/ | |
@OptIn(ObsoleteCoroutinesApi::class) | |
fun <T> Flow<T>.bufferChunks(count: Int, maxDuration: Duration): Flow<List<T>> { | |
return flow { | |
coroutineScope { | |
val flowScope = this | |
val emitter = actor<BufferSignal<T>> { | |
var emitJob: Job? = null | |
var bufferCount = 0 | |
val buffer = ArrayList<T>(count) | |
suspend fun doEmitBuffer() { | |
emitJob?.cancel() | |
emitJob = null | |
emit(buffer.toList()) | |
buffer.clear() | |
bufferCount++ | |
} | |
for (signal in channel) { | |
exhaustive(when (signal) { | |
is BufferSignal.Add -> { | |
buffer.add(signal.item) | |
if (buffer.size == 1) { | |
val countAtStart = bufferCount | |
// setup job to emit in given duration | |
emitJob = flowScope.launch { | |
delay(maxDuration) | |
channel.send(BufferSignal.Emit(countAtStart)) | |
} | |
} else if (buffer.size == count) { | |
doEmitBuffer() | |
} | |
Unit | |
} | |
is BufferSignal.Emit -> { | |
if (signal.count != null && signal.count != bufferCount) { | |
// we have already emitted this buffer! | |
continue | |
} | |
if (buffer.isEmpty()) { | |
// there's nothing to emit | |
continue | |
} | |
doEmitBuffer() | |
} | |
}) | |
} | |
if (buffer.isNotEmpty()) { | |
doEmitBuffer() | |
} | |
} | |
this@bufferChunks.collect { | |
emitter.send(BufferSignal.Add(it)) | |
} | |
emitter.close() | |
} | |
} | |
} | |
private sealed class BufferSignal<T> { | |
class Emit<T>(val count: Int? = null) : BufferSignal<T>() | |
data class Add<T>(val item: T) : BufferSignal<T>() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment