Skip to content

Instantly share code, notes, and snippets.

@tkroman
Last active June 7, 2020 18:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tkroman/96bda51583893aa8135a4bf890348650 to your computer and use it in GitHub Desktop.
Save tkroman/96bda51583893aa8135a4bf890348650 to your computer and use it in GitHub Desktop.
c.kt
@OptIn(
ExperimentalStdlibApi::class,
ExperimentalCoroutinesApi::class,
ExperimentalTime::class,
ExperimentalUnsignedTypes::class,
ExperimentalTypeInference::class
)
fun <T> Flow<T>.chunked(n: UInt, t: Duration): Flow<List<T>> {
val maxChunkSize = n.toInt()
val delayMillis = t.toLongMilliseconds()
val (endMarker, pipe) = markEnd()
return channelFlow<List<T>> {
var lastSent: Long = Platform.now()
var done: Boolean? = null
val buf = ArrayDeque<T>(maxChunkSize)
val bufMutex = Mutex()
launch {
while (isActive && done != true) {
if (Platform.now() - lastSent >= delayMillis) {
if (done == false) done = true
lastSent = Platform.now()
val toSend = bufMutex.withLock {
if (buf.isNotEmpty()) {
buf.drain()
} else {
null
}
}
if (toSend != null) {
send(toSend)
}
}
delay(delayMillis - (Platform.now() - lastSent))
}
}
pipe.collect { el ->
if (el === endMarker) {
done = false
} else {
val toSend = bufMutex.withLock {
buf.add(el as T)
if (buf.size == maxChunkSize) buf.drain() else null
}
if (toSend != null) {
send(toSend)
lastSent = Platform.now()
}
}
}
}
}
private fun <T> Flow<T>.markEnd(): Pair<Any, Flow<Any?>> {
val endMarker = Any()
val f = flow { collect { emit(it) }; emit(endMarker) }
return endMarker to f
}
@OptIn(ExperimentalStdlibApi::class)
fun <A> ArrayDeque<A>.drain(): List<A> {
return buildList(size) {
while (this@drain.isNotEmpty()) {
add(this@drain.removeFirst())
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment