Skip to content

Instantly share code, notes, and snippets.

@twyatt
Last active October 5, 2018 20:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save twyatt/268dae8ffcbd6b25b23e2ae6bad7a85f to your computer and use it in GitHub Desktop.
Save twyatt/268dae8ffcbd6b25b23e2ae6bad7a85f to your computer and use it in GitHub Desktop.
Demonstrates wrapping a callback based class to provide interaction via Kotlin Coroutine Channels

Demonstrates closing channel 1. Channel 2 continues to be consumable (wrapper is not closed).

(cancelMode = CancelMode.CHANNEL)

main → threads=[Monitor Ctrl-Break, Signal Dispatcher, Finalizer, main @coroutine#1, Reference Handler]
Wrapper[1] → createChannel → <start>
Wrapper[1] → client.observe
Client → observe
Client → observe → thread → handler.onResponse(0)
Wrapper[1] → onResponse → <start>
Wrapper[1] → Configuring channel.invokeOnClose
Wrapper[1] → createChannel → <end>
Wrapper[2] → createChannel → <start>
Wrapper[2] → client.observe
Client → observe
Client → observe → thread → handler.onResponse(0)
Wrapper[2] → onResponse → <start>
Wrapper[2] → Configuring channel.invokeOnClose
Wrapper[2] → createChannel → <end>
main → channel1 → launch (to consume)
main → channel2 → launch (to consume)
Wrapper[1] → onResponse → <end>
Client → observe → thread → Pausing for 1 sec
Wrapper[2] → onResponse → <end>
Client → observe → thread → Pausing for 1 sec
Wrapper[2] → onResponse → send Response 0 → <start>
Wrapper[1] → onResponse → send Response 0 → <start>
main → delaying for 5.5 seconds
main → channel1 → consumeEach
main → channel1 → consumed Response(value=0)
main → channel2 → consumeEach
main → channel2 → consumed Response(value=0)
Wrapper[1] → onResponse → send Response 0 → <end>
Wrapper[2] → onResponse → send Response 0 → <end>
Client → observe → thread → handler.onResponse(1)
Wrapper[1] → onResponse → <start>
Wrapper[1] → onResponse → <end>
Client → observe → thread → Pausing for 1 sec
Client → observe → thread → handler.onResponse(1)
Wrapper[2] → onResponse → <start>
Wrapper[2] → onResponse → <end>
Client → observe → thread → Pausing for 1 sec
Wrapper[1] → onResponse → send Response 1 → <start>
Wrapper[2] → onResponse → send Response 1 → <start>
Client → observe → thread → handler.onResponse(2)
Client → observe → thread → handler.onResponse(2)
Wrapper[2] → onResponse → <start>
Wrapper[1] → onResponse → <start>
Wrapper[2] → onResponse → <end>
Client → observe → thread → Pausing for 1 sec
Wrapper[2] → onResponse → send Response 2 → <start>
Wrapper[1] → onResponse → send Response 2 → <start>
Wrapper[1] → onResponse → <end>
Client → observe → thread → Pausing for 1 sec
Client → observe → thread → handler.onResponse(3)
Wrapper[2] → onResponse → <start>
Wrapper[2] → onResponse → <end>
Client → observe → thread → Pausing for 1 sec
Wrapper[2] → onResponse → send Response 3 → <start>
Client → observe → thread → handler.onResponse(3)
Wrapper[1] → onResponse → <start>
Wrapper[1] → onResponse → <end>
Client → observe → thread → Pausing for 1 sec
Wrapper[1] → onResponse → send Response 3 → <start>
Client → observe → thread → handler.onResponse(4)
Wrapper[2] → onResponse → <start>
Wrapper[2] → onResponse → <end>
Client → observe → thread → Pausing for 1 sec
Client → observe → thread → handler.onResponse(4)
Wrapper[1] → onResponse → <start>
Wrapper[2] → onResponse → send Response 4 → <start>
Wrapper[1] → onResponse → send Response 4 → <start>
Wrapper[1] → onResponse → <end>
Client → observe → thread → Pausing for 1 sec
main → channel1 → consumed Response(value=1)
main → channel2 → consumed Response(value=1)
Wrapper[1] → onResponse → send Response 1 → <end>
Wrapper[2] → onResponse → send Response 1 → <end>
Client → observe → thread → handler.onResponse(5)
Wrapper[2] → onResponse → <start>
Client → observe → thread → handler.onResponse(5)
Wrapper[1] → onResponse → <start>
Wrapper[2] → onResponse → send Response 5 → <start>
Wrapper[2] → onResponse → <end>
Client → observe → thread → Pausing for 1 sec
Wrapper[1] → onResponse → <end>
Client → observe → thread → Pausing for 1 sec
Wrapper[1] → onResponse → send Response 5 → <start>
main → Canceling channel 1
Wrapper[1] → channel.invokeOnClose → <start>
Wrapper[1] → channelJob.invokeOnCompletion → <start>
Wrapper[1] → channelJob.invokeOnCompletion → <end>
Client → Runnable → interrupt
Client → observe → InterruptedException
Wrapper[1] → channel.invokeOnClose → <end>
main → joining channel 1's launch
Client → observe → thread → handler.onResponse(6)
Wrapper[2] → onResponse → <start>
Wrapper[2] → onResponse → <end>
Client → observe → thread → Pausing for 1 sec
Wrapper[2] → onResponse → send Response 6 → <start>
Client → observe → thread → handler.onResponse(7)
Wrapper[2] → onResponse → <start>
Wrapper[2] → onResponse → <end>
Client → observe → thread → Pausing for 1 sec
Wrapper[2] → onResponse → send Response 7 → <start>
Client → observe → thread → handler.onResponse(8)
Wrapper[2] → onResponse → <start>
Wrapper[2] → onResponse → <end>
Client → observe → thread → Pausing for 1 sec
Wrapper[2] → onResponse → send Response 8 → <start>
Client → observe → thread → handler.onResponse(9)
Wrapper[2] → onResponse → <start>
Wrapper[2] → onResponse → <end>
Client → observe → thread → Pausing for 1 sec
Wrapper[2] → onResponse → send Response 9 → <start>
main → channel2 → consumed Response(value=2)
main → joining channel 2's launch
Wrapper[2] → onResponse → send Response 2 → <end>
Client → observe → thread → handler.onResponse(10)
Wrapper[2] → onResponse → <start>
Wrapper[2] → onResponse → <end>
Client → observe → thread → Pausing for 1 sec
Wrapper[2] → onResponse → send Response 10 → <start>
Client → observe → thread → handler.onResponse(11)
Wrapper[2] → onResponse → <start>
Wrapper[2] → onResponse → <end>
Client → observe → thread → Pausing for 1 sec
Wrapper[2] → onResponse → send Response 11 → <start>
Client → observe → thread → handler.onResponse(12)
Wrapper[2] → onResponse → <start>
Wrapper[2] → onResponse → <end>
Client → observe → thread → Pausing for 1 sec
Wrapper[2] → onResponse → send Response 12 → <start>
Client → observe → thread → handler.onResponse(13)
Wrapper[2] → onResponse → <start>
Wrapper[2] → onResponse → <end>
Client → observe → thread → Pausing for 1 sec
Wrapper[2] → onResponse → send Response 13 → <start>
Client → observe → thread → handler.onResponse(14)
Wrapper[2] → onResponse → <start>
Wrapper[2] → onResponse → <end>
Client → observe → thread → Pausing for 1 sec
Wrapper[2] → onResponse → send Response 14 → <start>

...

Demonstrates closing wrapper. In turn, channel 1 and channel 2 will also be closed.

(cancelMode = CancelMode.WRAPPER)

main → threads=[Monitor Ctrl-Break, Signal Dispatcher, Finalizer, main @coroutine#1, Reference Handler]
Wrapper[1] → createChannel → <start>
Wrapper[1] → client.observe
Client → observe
Client → observe → thread → handler.onResponse(0)
Wrapper[1] → onResponse → <start>
Wrapper[1] → Configuring channel.invokeOnClose
Wrapper[1] → createChannel → <end>
Wrapper[2] → createChannel → <start>
Wrapper[2] → client.observe
Client → observe
Wrapper[2] → Configuring channel.invokeOnClose
Wrapper[2] → createChannel → <end>
main → channel1 → launch (to consume)
Client → observe → thread → handler.onResponse(0)
Wrapper[2] → onResponse → <start>
main → channel2 → launch (to consume)
Wrapper[1] → onResponse → <end>
Client → observe → thread → Pausing for 1 sec
Wrapper[1] → onResponse → send Response 0 → <start>
Wrapper[2] → onResponse → send Response 0 → <start>
Wrapper[2] → onResponse → <end>
Client → observe → thread → Pausing for 1 sec
main → delaying for 5.5 seconds
main → channel1 → consumeEach
main → channel1 → consumed Response(value=0)
main → channel2 → consumeEach
main → channel2 → consumed Response(value=0)
Wrapper[1] → onResponse → send Response 0 → <end>
Wrapper[2] → onResponse → send Response 0 → <end>
Client → observe → thread → handler.onResponse(1)
Wrapper[1] → onResponse → <start>
Wrapper[1] → onResponse → <end>
Client → observe → thread → Pausing for 1 sec
Client → observe → thread → handler.onResponse(1)
Wrapper[2] → onResponse → <start>
Wrapper[2] → onResponse → <end>
Client → observe → thread → Pausing for 1 sec
Wrapper[1] → onResponse → send Response 1 → <start>
Wrapper[2] → onResponse → send Response 1 → <start>
Client → observe → thread → handler.onResponse(2)
Wrapper[1] → onResponse → <start>
Client → observe → thread → handler.onResponse(2)
Wrapper[2] → onResponse → <start>
Wrapper[1] → onResponse → <end>
Client → observe → thread → Pausing for 1 sec
Wrapper[2] → onResponse → <end>
Client → observe → thread → Pausing for 1 sec
Wrapper[2] → onResponse → send Response 2 → <start>
Wrapper[1] → onResponse → send Response 2 → <start>
Client → observe → thread → handler.onResponse(3)
Client → observe → thread → handler.onResponse(3)
Wrapper[1] → onResponse → <start>
Wrapper[2] → onResponse → <start>
Wrapper[1] → onResponse → <end>
Client → observe → thread → Pausing for 1 sec
Wrapper[2] → onResponse → send Response 3 → <start>
Wrapper[1] → onResponse → send Response 3 → <start>
Wrapper[2] → onResponse → <end>
Client → observe → thread → Pausing for 1 sec
Client → observe → thread → handler.onResponse(4)
Wrapper[1] → onResponse → <start>
Client → observe → thread → handler.onResponse(4)
Wrapper[2] → onResponse → <start>
Wrapper[1] → onResponse → send Response 4 → <start>
Wrapper[2] → onResponse → <end>
Client → observe → thread → Pausing for 1 sec
Wrapper[1] → onResponse → <end>
Client → observe → thread → Pausing for 1 sec
Wrapper[2] → onResponse → send Response 4 → <start>
Client → observe → thread → handler.onResponse(5)
Wrapper[2] → onResponse → <start>
Client → observe → thread → handler.onResponse(5)
Wrapper[1] → onResponse → <start>
main → channel1 → consumed Response(value=1)
Wrapper[2] → onResponse → send Response 5 → <start>
Wrapper[1] → onResponse → send Response 5 → <start>
Wrapper[2] → onResponse → send Response 1 → <end>
Wrapper[2] → onResponse → <end>
Client → observe → thread → Pausing for 1 sec
main → channel2 → consumed Response(value=1)
Wrapper[1] → onResponse → <end>
Client → observe → thread → Pausing for 1 sec
Wrapper[1] → onResponse → send Response 1 → <end>
main → Closing wrapper
Wrapper → close → <start>
Wrapper[1] → channelJob.invokeOnCompletion → <start>
Wrapper[1] → channel.invokeOnClose → <start>
Client → Runnable → interrupt
Client → observe → InterruptedException
Wrapper[1] → channel.invokeOnClose → <end>
Wrapper[1] → channelJob.invokeOnCompletion → <end>
Wrapper → close → <end>
main → joining channel 1's launch
Wrapper[2] → channelJob.invokeOnCompletion → <start>
Wrapper[2] → channel.invokeOnClose → <start>
Client → Runnable → interrupt
Client → observe → InterruptedException
Wrapper[2] → channel.invokeOnClose → <end>
Wrapper[2] → channelJob.invokeOnCompletion → <end>
main → joining channel 2's launch
main → threads=[DefaultDispatcher-worker-1, DefaultDispatcher-worker-4, DefaultDispatcher-worker-3, main @coroutine#1, DefaultDispatcher-worker-2, Monitor Ctrl-Break, Signal Dispatcher, Finalizer, Reference Handler]
main → <end>
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.Channel
import kotlinx.coroutines.experimental.channels.consumeEach
import java.io.Closeable
import java.lang.Thread.sleep
import kotlin.concurrent.thread
import kotlin.coroutines.experimental.CoroutineContext
val threads: Set<Thread>
get() = Thread.getAllStackTraces().keys
val threadNames: List<String>
get() = threads.map { it.name }
enum class CancelMode {
// Demonstrates closing channel 1. Channel 2 continues to be consumable (wrapper is not closed).
CHANNEL,
// Demonstrates closing wrapper. In turn, channel 1 and channel 2 will also be closed.
WRAPPER
}
private val cancelMode = CancelMode.CHANNEL
fun main(args: Array<String>) = runBlocking {
println("main → threads=$threadNames")
val wrapper = Wrapper()
val channel1 = wrapper.createChannel("1")
val channel2 = wrapper.createChannel("2")
println("main → channel1 → launch (to consume)")
val channelJob1 = launch {
println("main → channel1 → consumeEach")
channel1.consumeEach {
println("main → channel1 → consumed $it")
delay(5_000L)
}
}
println("main → channel2 → launch (to consume)")
val channelJob2 = launch {
println("main → channel2 → consumeEach")
channel2.consumeEach {
println("main → channel2 → consumed $it")
delay(5_000L)
}
}
println("main → delaying for 5.5 seconds")
delay(5_500L)
when (cancelMode) {
CancelMode.CHANNEL -> {
println("main → Canceling channel 1")
channel1.cancel()
}
CancelMode.WRAPPER -> {
println("main → Closing wrapper")
wrapper.close()
}
}
println("main → joining channel 1's launch")
channelJob1.join()
println("main → joining channel 2's launch")
channelJob2.join()
println("main → threads=$threadNames")
sleep(1_000L)
println("main → <end>")
}
sealed class Result {
data class Response(val value: Int) : Result()
data class Error(val exception: Exception) : Result()
}
interface Handler {
fun onResponse(v: Int)
fun onError(e: Exception)
}
class Client {
fun observe(handler: Handler): Runnable {
println("Client → observe")
val th = thread {
var i = 0
while (!Thread.currentThread().isInterrupted) {
println("Client → observe → thread → handler.onResponse($i)")
handler.onResponse(i++)
try {
println("Client → observe → thread → Pausing for 1 sec")
sleep(1_000L)
} catch (e: InterruptedException) {
println("Client → observe → InterruptedException")
return@thread
}
}
}
return Runnable {
println("Client → Runnable → interrupt")
th.interrupt()
th.join()
}
}
}
class Wrapper : CoroutineScope, Closeable {
private val job = Job()
override val coroutineContext: CoroutineContext = job
private val client = Client()
fun createChannel(name: String): Channel<Result> {
println("Wrapper[$name] → createChannel → <start>")
val channel = Channel<Result>()
// Job that scopes this individual channel (cancels related Coroutines when Channel is
// closed).
val channelJob = Job(job).apply {
invokeOnCompletion {
println("Wrapper[$name] → channelJob.invokeOnCompletion → <start>")
channel.cancel()
println("Wrapper[$name] → channelJob.invokeOnCompletion → <end>")
}
}
println("Wrapper[$name] → client.observe")
val relation = client.observe(object : Handler {
override fun onResponse(v: Int) {
println("Wrapper[$name] → onResponse → <start>")
launch(channelJob) {
println("Wrapper[$name] → onResponse → send Response $v → <start>")
channel.send(Result.Response(v))
println("Wrapper[$name] → onResponse → send Response $v → <end>")
}
println("Wrapper[$name] → onResponse → <end>")
}
override fun onError(e: Exception) {
println("Wrapper[$name] → onError → <start>")
launch(channelJob) {
println("Wrapper[$name] → onError → send Error $e → <start>")
channel.send(Result.Error(e))
println("Wrapper[$name] → onError → send Error $e → <end>")
}
println("Wrapper[$name] → onError → <end>")
}
})
println("Wrapper[$name] → Configuring channel.invokeOnClose")
channel.invokeOnClose {
println("Wrapper[$name] → channel.invokeOnClose → <start>")
channelJob.cancel()
relation.run()
println("Wrapper[$name] → channel.invokeOnClose → <end>")
}
println("Wrapper[$name] → createChannel → <end>")
return channel
}
override fun close() {
println("Wrapper → close → <start>")
job.cancel()
println("Wrapper → close → <end>")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment