Skip to content

Instantly share code, notes, and snippets.

@elizarov
Last active June 22, 2021 19:53
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save elizarov/896f2c066c8841377f9db0b1f0a90f96 to your computer and use it in GitHub Desktop.
Save elizarov/896f2c066c8841377f9db0b1f0a90f96 to your computer and use it in GitHub Desktop.
timedBuffer
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
private class Buffer<T> {
private val list = arrayListOf<T>()
@Synchronized
fun add(value: T) { list.add(value) }
@Synchronized
fun pollIfNotEmpty() =
list.takeIf { it.isNotEmpty() }?.toList()?.also { list.clear() }
}
fun <T> Flow<T>.timedBuffer(millis: Long): Flow<List<T>> = channelFlow {
val buffer = Buffer<T>()
suspend fun flush() { buffer.pollIfNotEmpty()?.let { send(it) } }
val job = launch {
while (isActive) {
delay(millis)
flush()
}
}
collect { buffer.add(it) }
job.cancelAndJoin()
flush()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment