Skip to content

Instantly share code, notes, and snippets.

@objcode
Last active January 15, 2019 01:36
Show Gist options
  • Save objcode/32bd49266ca1eb5877d8f18c8f3451b2 to your computer and use it in GitHub Desktop.
Save objcode/32bd49266ca1eb5877d8f18c8f3451b2 to your computer and use it in GitHub Desktop.
private class DownloadActor(val scope: CoroutineScope) {
private val requested = mutableMapOf<Location, MutableList<Reference>>()
private val apiThread = newFixedThreadPoolContext(1, "DownloadActor")
private fun downloader(
references: ReceiveChannel<Reference>,
locations: SendChannel<Location>
) = scope.launch(apiThread) {
for (ref in references) {
val loc = ref.resolveLocation()
val refs = requested[loc]
if (refs == null) {
requested[loc] = mutableListOf(ref)
locations.send(loc)
} else {
refs.add(ref)
}
}
}
private fun worker(
locations: ReceiveChannel<Location>,
contents: SendChannel<LocContent>
) = scope.launch {
for (loc in locations) {
val content = downloadContent(loc)
contents.send(LocContent(loc, content))
}
}
private fun processor(contents: ReceiveChannel<LocContent>) = scope.launch(apiThread) {
for (locContent in contents) {
val refs = requested.remove(locContent.loc)!!
for (ref in refs) {
processContent(ref, locContent.content)
}
}
}
fun process(references: ReceiveChannel<Reference>) {
val locations = Channel<Location>()
val contents = Channel<LocContent>(0)
repeat(N_WORKERS) { worker(locations, contents) }
downloader(references, locations)
processor(contents)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment