-
-
Save NikkyAI/025436616e81ff0a1711a6144fba0066 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private val logger = KotlinLogging.logger {} | |
@OptIn(ExperimentalCoroutinesApi::class) | |
fun <T> Flow<T>.chunked( | |
chunkSize: Int = 64, | |
delayMillis: Long = 250 | |
): Flow<List<T>> { | |
val lock = ReentrantLock() | |
val page = CopyOnWriteArrayList<T>() | |
return channelFlow<List<T>> { | |
fun startTimer(): Job = launch(MDCContext() + CoroutineName("chunked-timer") + Dispatchers.IO) { | |
logger.info { "running timer in ${Thread.currentThread().name}" } | |
logger.debug { "starting timer" } | |
while (true) { | |
delay(delayMillis) | |
val toSend = lock.withLock { | |
val toSend = page.toList() | |
page.clear() | |
if (toSend.isNotEmpty()) { | |
toSend | |
} else { | |
null | |
} | |
} | |
if(toSend != null) { | |
logger.debug { "emitting: $toSend" } | |
channel.send(toSend) | |
logger.debug { "emitted" } | |
} | |
} | |
} | |
var timerJob: Job = startTimer() | |
this@chunked.collect { element -> | |
logger.info { "processing item in ${Thread.currentThread().name}" } | |
val toSend = lock.withLock { | |
page.add(element) | |
if (page.size >= chunkSize) { | |
val toSend = page.toList() | |
page.clear() | |
toSend | |
} else { | |
// nothing to send | |
null | |
} | |
} | |
if(toSend != null) { | |
logger.debug { "page full, resetting timer" } | |
timerJob.cancel() | |
channel.send(toSend) | |
logger.debug { "emitting: $toSend" } | |
timerJob = startTimer() | |
} | |
} | |
logger.debug { "flow completing" } | |
delay(delayMillis) | |
logger.debug { "cancelling timer" } | |
timerJob.cancel() | |
logger.debug { "flow completed" } | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private val ingestChannel = Channel<QueueItem>(trackerServiceProperties.channelCapacity) | |
init { | |
GlobalScope.launch(MDCContext() + Dispatchers.IO) { | |
ingestChannel | |
.consumeAsFlow() | |
.chunked( | |
chunkSize = trackerServiceProperties.chunkSize, | |
delayMillis = trackerServiceProperties.delayMillis, | |
).onEach { chunk: List<QueueItem> -> | |
processChunk(chunk) | |
} | |
.collect() | |
} | |
} | |
suspend fun queue(item: QueueItem) { | |
ingestChannel.send(item) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment