Skip to content

Instantly share code, notes, and snippets.

@fkorotkov
Created May 12, 2020 21:22
Show Gist options
  • Save fkorotkov/38586fd3d42b15381fd9b7bb6c99dcfd to your computer and use it in GitHub Desktop.
Save fkorotkov/38586fd3d42b15381fd9b7bb6c99dcfd to your computer and use it in GitHub Desktop.
Code samples for a blog post
class BatchLoader<ID, T>(
poolSize: Int = 8,
private val keyBatchSizeLimit: Int = 100,
private val delegateLoader: Loader<ID, T>
) : Loader<ID, T>, CoroutineScope {
override val coroutineContext: CoroutineContext = Executors.newFixedThreadPool(poolSize).asCoroutineDispatcher()
private val requests = Channel<LoadRequest<ID, T>>(Channel.UNLIMITED) // UNLIMITED allows to queue requests
private inner class RequestWorker {
/* ... */
}
init {
repeat(poolSize) {
launch(coroutineContext) {
RequestWorker().start()
}
}
}
override suspend fun loadByIds(ids: Set<ID>): Map<ID, T> {
val request = LoadRequest<ID, T>(ids)
requests.send(request)
return request.result.await()
}
}
class BatchLoader<ID, T>(
poolSize: Int = 8,
private val keyBatchSizeLimit: Int = 100,
private val delegateLoader: Loader<ID, T>
) : Loader<ID, T> {
// ...
}
private data class LoadRequest<ID, T>(
val ids: Set<ID>,
val result: CompletableDeferred<Map<ID, T>> = CompletableDeferred<Map<ID, T>>()
)
class BatchLoader<ID, T>(
/* ... */
) : Loader<ID, T> {
private val requests = Channel<LoadRequest<ID, T>>(Channel.UNLIMITED) // UNLIMITED allows to queue requests
override suspend fun loadByIds(ids: Set<ID>): Map<ID, T> {
val request = LoadRequest<ID, T>(ids) // create a request
requests.send(request) // queue the request
return request.result.await() // await for a request to be fulfilled
}
}
interface Loader<ID, T> {
suspend fun loadByIds(ids: Set<ID>): Map<ID, T>
}
private inner class RequestWorker {
suspend fun start() {
while (true) {
val requestsToProcess = LinkedList<LoadRequest<ID, T>>()
val idsToLoad = HashSet<ID>()
val request = requests.receive() // suspend until channel has a request
requestsToProcess.add(request)
idsToLoad.addAll(request.ids)
// get as many queued requests as possible
while (idsToLoad.size < keyBatchSizeLimit) {
// trick to see if there are more requests to process
val additionalRequest = select<LoadRequest<ID, T>?> {
requests.onReceive { it }
onTimeout(0) { null }
} ?: break
requestsToProcess.add(additionalRequest)
idsToLoad.addAll(additionalRequest.ids)
}
try {
val loadedObjects = delegateLoader.loadByIds(idsToLoad)
requestsToProcess.forEach { request ->
request.result.complete(
loadedObjects.filterKeys { request.ids.contains(it) }
)
}
} catch (e: Exception) {
requestsToProcess.forEach { it.result.completeExceptionally(e) }
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment