Last active
March 28, 2018 14:45
-
-
Save elizarov/34de14377c6a1caf50d560b35a47adc0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import kotlinx.coroutines.experimental.channels.* | |
import kotlinx.coroutines.experimental.* | |
class DownloadQueue { | |
private sealed class Msg { | |
class Request(val name: String) : Msg() { | |
val answer = CompletableDeferred<Unit>() | |
} | |
class Result(val name: String, val error: Throwable? = null) : Msg() | |
} | |
// actor that processes downloads, we should use unlimited capacity here: | |
// manager would never send too many requests here and we don't want it to be blocked | |
private val processor: SendChannel<String> = actor(capacity = Channel.UNLIMITED) { | |
for (name in channel) { | |
println("Start processing $name!") | |
try { | |
//Simulated processing | |
delay(1000L) | |
println("End processing $name, notifying...") | |
manager.send(Msg.Result(name)) | |
} catch (e: Throwable) { | |
manager.send(Msg.Result(name, e)) | |
} | |
println("End processing $name!") | |
} | |
} | |
// actor that manages download queue | |
private val manager: SendChannel<Msg> = actor { | |
val requests = HashMap<String, ArrayList<Msg.Request>>() | |
for (msg in channel) { | |
when (msg) { | |
is Msg.Request -> { | |
requests.getOrPut(msg.name) { | |
// send to processor when the first request comes in | |
processor.offer(msg.name) // would always succeed | |
ArrayList() | |
} += msg // add to the list | |
} | |
is Msg.Result -> { | |
for (resp in requests.remove(msg.name)!!) { | |
if (msg.error != null) | |
resp.answer.completeExceptionally(msg.error) | |
else | |
resp.answer.complete(Unit) | |
} | |
} | |
} | |
} | |
} | |
suspend fun enqueue(name: String) { | |
val req = Msg.Request(name) | |
manager.send(req) | |
req.answer.await() | |
} | |
} | |
fun main(args: Array<String>) = runBlocking<Unit> { | |
val q = DownloadQueue() | |
launch(coroutineContext) { | |
for (i in 1..3) { | |
val x = "$i" | |
println("A>>$x") | |
q.enqueue(x) | |
println("A<<$x") | |
} | |
} | |
launch(coroutineContext) { | |
for (i in 2..5) { | |
val x = "$i" | |
println("B>>$x") | |
q.enqueue(x) | |
println("B<<$x") | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment