Skip to content

Instantly share code, notes, and snippets.

@NikkyAI
Last active December 15, 2020 09:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save NikkyAI/025436616e81ff0a1711a6144fba0066 to your computer and use it in GitHub Desktop.
Save NikkyAI/025436616e81ff0a1711a6144fba0066 to your computer and use it in GitHub Desktop.
private val logger = KotlinLogging.logger {}
@OptIn(ExperimentalCoroutinesApi::class)
fun <T> Flow<T>.chunked(
chunkSize: Int = 64,
delayMillis: Long = 250
): Flow<List<T>> {
val lock = ReentrantLock()
val page = CopyOnWriteArrayList<T>()
return channelFlow<List<T>> {
fun startTimer(): Job = launch(MDCContext() + CoroutineName("chunked-timer") + Dispatchers.IO) {
logger.info { "running timer in ${Thread.currentThread().name}" }
logger.debug { "starting timer" }
while (true) {
delay(delayMillis)
val toSend = lock.withLock {
val toSend = page.toList()
page.clear()
if (toSend.isNotEmpty()) {
toSend
} else {
null
}
}
if(toSend != null) {
logger.debug { "emitting: $toSend" }
channel.send(toSend)
logger.debug { "emitted" }
}
}
}
var timerJob: Job = startTimer()
this@chunked.collect { element ->
logger.info { "processing item in ${Thread.currentThread().name}" }
val toSend = lock.withLock {
page.add(element)
if (page.size >= chunkSize) {
val toSend = page.toList()
page.clear()
toSend
} else {
// nothing to send
null
}
}
if(toSend != null) {
logger.debug { "page full, resetting timer" }
timerJob.cancel()
channel.send(toSend)
logger.debug { "emitting: $toSend" }
timerJob = startTimer()
}
}
logger.debug { "flow completing" }
delay(delayMillis)
logger.debug { "cancelling timer" }
timerJob.cancel()
logger.debug { "flow completed" }
}
}
private val ingestChannel = Channel<QueueItem>(trackerServiceProperties.channelCapacity)
init {
GlobalScope.launch(MDCContext() + Dispatchers.IO) {
ingestChannel
.consumeAsFlow()
.chunked(
chunkSize = trackerServiceProperties.chunkSize,
delayMillis = trackerServiceProperties.delayMillis,
).onEach { chunk: List<QueueItem> ->
processChunk(chunk)
}
.collect()
}
}
suspend fun queue(item: QueueItem) {
ingestChannel.send(item)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment