Skip to content

Instantly share code, notes, and snippets.

@zach-klippenstein
Created October 24, 2018 16:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zach-klippenstein/ee1f96dd12ea38509fc7e870109491fe to your computer and use it in GitHub Desktop.
Save zach-klippenstein/ee1f96dd12ea38509fc7e870109491fe to your computer and use it in GitHub Desktop.
Sketch of what a worker pool for limiting concurrent database operations might look like.
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
class DatabaseExecutor(
nWorkers: Int,
context: CoroutineContext
) {
private inner class Task<R>(val runnable: suspend CoroutineScope.() -> R) {
private val _result = CompletableDeferred<R>()
/** Should only be called from worker. */
suspend fun computeResult() {
// Spin up a new coroutine scope that is a child of the worker scope
// to delimit any coroutines started by the task.
coroutineScope {
try {
_result.complete(runnable())
} catch (e: Throwable) {
_result.completeExceptionally(e)
}
}
}
suspend fun awaitResult() = _result.await()
}
private val taskQueue = Channel<Task<*>>()
/** Use a SupervisorJob because */
private val job = SupervisorJob(context[Job])
init {
val workerScope = CoroutineScope(context + job)
repeat(nWorkers) {
workerScope.launch {
for (task in taskQueue) {
task.computeResult()
}
}
}
}
/**
* Runs [block] in the context passed to the constructor.
*
* The block's scope will have a new [Job] that is a child of any job from that context.
*
* Exceptions thrown from [block] will not be propagated up (i.e. they will not cancel
* this worker or other workers), but will be re-thrown to the caller.
*/
suspend fun <R> withDB(block: suspend CoroutineScope.() -> R): R =
Task(block)
.also { taskQueue.send(it) }
.awaitResult()
/** Waits for all queued tasks to complete, then shuts down all workers. */
suspend fun shutdown() {
taskQueue.close()
job.join()
}
}
suspend fun main() {
val database = DatabaseExecutor(
nWorkers = 10,
context = coroutineContext + Dispatchers.IO
)
database.withDB {
// suspending database calls
}
// …
database.shutdown()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment