Skip to content

Instantly share code, notes, and snippets.

@zach-klippenstein
Last active November 5, 2018 18:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zach-klippenstein/38bd9a6fc4a84ee3e2435956ada18504 to your computer and use it in GitHub Desktop.
Save zach-klippenstein/38bd9a6fc4a84ee3e2435956ada18504 to your computer and use it in GitHub Desktop.
interface Host {
suspend fun getMovies(): List<Movies>
suspend fun setWatched(movie: Movie): Boolean
close()
}
class HostImpl(
// You could have a different WorkerPool for each system you need to talk to
// that has its own throttling requirements.
private val workerPool: WorkerPool
) {
init {
// If your Host type has some explicit start mechanism this should go there instead.
workerPool.startWorkers()
}
override suspend fun getMovies(): List<Movies> {
val request = GetMoviesRequest()
val response = workerPool.execute(request)
return parseGetMoviesResponse(response)
}
override suspend fun setWatched(movie: Movie): Boolean {
val request = SetWatchedRequest(movie)
val response = workerPool.execute(request)
return parseSetWatchedResponse(response)
}
override fun close() = workerPool.close()
}
private class WorkerPool(
private val httpClient: HttpClient,
private val maxConcurrentTasks: Int,
private val context: CoroutineContext = Dispatchers.IO
) : CoroutineScope {
private class Task(val request: Request) {
val response = CompletableDeferred<Response>()
}
private val tasks = Channel<Task>(UNLIMITED)
private val job = Job(parent = context[Job])
override val coroutineContext = context + job
fun startWorkers() {
repeat(maxConcurrentTasks) {
launch {
for (task in tasks) executeTask(task)
}
}
}
suspend fun execute(request: Request): Response {
val task = Task(request)
tasks.send(task)
return task.response.await()
}
/** @param immediately If false, will finish processing all queued tasks before terminating workers. */
fun close(immediately: Boolean = false) {
tasks.close()
if (immediately) job.cancel()
}
private suspend fun executeTask(task: Task) {
try {
val response = httpClient.doRequest(task.request)
task.response.complete(response)
} catch (e: Throwable) {
task.response.completeExceptionally(e)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment