Skip to content

Instantly share code, notes, and snippets.

@fluidsonic
Last active December 15, 2020 11:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fluidsonic/d26dd924e9665229284589f9155de1f6 to your computer and use it in GitHub Desktop.
Save fluidsonic/d26dd924e9665229284589f9155de1f6 to your computer and use it in GitHub Desktop.
import java.time.*
import kotlin.time.*
import kotlin.time.Duration
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
private object Done
private object TimerExpired
@OptIn(ExperimentalTime::class)
fun <T> Flow<T>.chunked(
sizeLimit: Int,
timeLimit: Duration,
): Flow<List<T>> {
require(sizeLimit > 0) { "'sizeLimit' must be positive: $sizeLimit" }
require(timeLimit > 0.milliseconds) { "'timeLimit' must be positive: $timeLimit" }
val upstream: Flow<Any?> = this
return flow<List<T>> {
val timerEnabled = MutableSharedFlow<Boolean?>()
var queue = mutableListOf<T>()
merge(
upstream.onCompletion { emit(Done) },
timerEnabled
.takeWhile { it != null }
.flatMapLatest { enabled ->
if (enabled!!)
flow {
delay(timeLimit)
emit(TimerExpired)
}
else
emptyFlow()
}
)
.collect { element ->
when (element) {
Done -> {
if (queue.isNotEmpty()) {
emit(queue)
queue = mutableListOf()
}
timerEnabled.emit(null)
}
TimerExpired -> {
if (queue.isNotEmpty()) {
emit(queue)
queue = mutableListOf()
}
}
else -> {
queue.add(element as T)
if (queue.size >= sizeLimit) {
emit(queue)
queue = mutableListOf()
timerEnabled.emit(false)
} else if (queue.size == 1)
timerEnabled.emit(true)
}
}
}
}
}
@OptIn(ExperimentalTime::class)
suspend fun main() {
flow {
emit(1)
emit(2)
delay(100)
emit(3)
delay(100)
emit(4)
emit(5)
emit(6)
delay(1000)
emit(7)
emit(8)
emit(9)
emit(10)
emit(11)
emit(12)
delay(1000)
emit(13)
}
.chunked(sizeLimit = 5, timeLimit = 450.milliseconds)
.collect { element ->
println("${Instant.now()}: $element")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment