Skip to content

Instantly share code, notes, and snippets.

@elizarov
Last active March 28, 2018 14:45
Show Gist options
  • Save elizarov/34de14377c6a1caf50d560b35a47adc0 to your computer and use it in GitHub Desktop.
Save elizarov/34de14377c6a1caf50d560b35a47adc0 to your computer and use it in GitHub Desktop.
import kotlinx.coroutines.experimental.channels.*
import kotlinx.coroutines.experimental.*
class DownloadQueue {
private sealed class Msg {
class Request(val name: String) : Msg() {
val answer = CompletableDeferred<Unit>()
}
class Result(val name: String, val error: Throwable? = null) : Msg()
}
// actor that processes downloads, we should use unlimited capacity here:
// manager would never send too many requests here and we don't want it to be blocked
private val processor: SendChannel<String> = actor(capacity = Channel.UNLIMITED) {
for (name in channel) {
println("Start processing $name!")
try {
//Simulated processing
delay(1000L)
println("End processing $name, notifying...")
manager.send(Msg.Result(name))
} catch (e: Throwable) {
manager.send(Msg.Result(name, e))
}
println("End processing $name!")
}
}
// actor that manages download queue
private val manager: SendChannel<Msg> = actor {
val requests = HashMap<String, ArrayList<Msg.Request>>()
for (msg in channel) {
when (msg) {
is Msg.Request -> {
requests.getOrPut(msg.name) {
// send to processor when the first request comes in
processor.offer(msg.name) // would always succeed
ArrayList()
} += msg // add to the list
}
is Msg.Result -> {
for (resp in requests.remove(msg.name)!!) {
if (msg.error != null)
resp.answer.completeExceptionally(msg.error)
else
resp.answer.complete(Unit)
}
}
}
}
}
suspend fun enqueue(name: String) {
val req = Msg.Request(name)
manager.send(req)
req.answer.await()
}
}
fun main(args: Array<String>) = runBlocking<Unit> {
val q = DownloadQueue()
launch(coroutineContext) {
for (i in 1..3) {
val x = "$i"
println("A>>$x")
q.enqueue(x)
println("A<<$x")
}
}
launch(coroutineContext) {
for (i in 2..5) {
val x = "$i"
println("B>>$x")
q.enqueue(x)
println("B<<$x")
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment