Skip to content

Instantly share code, notes, and snippets.

@Tolriq
Created September 10, 2022 08:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Tolriq/5fe5cdc86f95308e65ef1d6540a718bb to your computer and use it in GitHub Desktop.
Save Tolriq/5fe5cdc86f95308e65ef1d6540a718bb to your computer and use it in GitHub Desktop.
Db worker pool
class DatabaseWorkerPool : CoroutineScope {
private const val MAX_TASKS = 4
internal class Task<R>(owner: Job, val request: suspend () -> R) {
val response = CompletableDeferred<R>(owner)
}
@Suppress("MemberVisibilityCanBePrivate")
internal val tasks = Channel<Task<*>>(Channel.UNLIMITED)
override val coroutineContext = Dispatchers.IO + SupervisorJob()
private suspend fun <R> executeTask(task: Task<R>) {
try {
val result = task.request()
if (!task.response.complete(result)) {
runCatching {
(result as? CursorWrapper)?.close()
}
}
} catch (e: Throwable) {
task.response.completeExceptionally(e)
}
}
init {
repeat(MAX_TASKS) {
launch(Dispatchers.Background) {
val initialTask = try {
tasks.receive()
} catch (e: Exception) {
return@launch
}
withContext(this@DatabaseTaskManager.coroutineContext) {
executeTask(initialTask)
for (task in tasks) executeTask(task)
}
}
}
}
suspend fun <R> execute(request: suspend () -> R): R {
val task = Task(kotlin.coroutines.coroutineContext.job, request)
tasks.send(task)
return try {
task.response.await()
} catch (e: Throwable) {
if (task.response.isCompleted) {
runCatching {
(task.response.getCompleted() as? CursorWrapper)?.close()
}
}
throw e
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment