Instantly share code, notes, and snippets.

Embed
What would you like to do?
Blocking coroutine behavior
package com.example
import kotlinx.coroutines.experimental.*
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
/**
* simulated blocking operation
*/
fun blockingFoo(x: Int): Int {
println("started foo($x)")
Thread.sleep(1000)
if (x == 5) throw Exception("Too big") else return x + 1
}
/**
* The main job. takes in a suspending function which runs [blockingFoo]
*/
fun execute(suspendingFoo: suspend (Int) -> Int): Job {
return launch(CommonPool) {
try {
for (i in 1..10) {
println("next: ${suspendingFoo(i)}")
}
} catch(e: Throwable) {
println("caught: ${e.message}")
}finally {
println("job ended")
}
}
}
fun main(args: Array<String>) = runBlocking {
(0..3).forEach { whichFoo ->
val job = when(whichFoo) {
0 -> {
println("\n--runFoo()---")
execute { runFoo(it) }
}
1 -> {
println("\n--runFooWrapped()---")
execute { runFooWrapped(it) }
}
2 -> {
println("\n--runFooExecutor()---")
execute { runFooExecutor(it) }
}
3 -> {
println("\n--runFooExecutor()---")
execute { runFooAsync(it) }
}
else -> throw IllegalArgumentException()
}
delay(4500) // timed to end during blockingFoo(5)'s Thread.sleep()
println("canceling")
job.cancel()
delay(2000)
}
}
val singleThreadContext = newSingleThreadContext("BlockingThread")
val singleThreadExecutor: ExecutorService = Executors.newSingleThreadExecutor()
/**
* simple [run]. [Job.cancel] is ignored.
*/
suspend fun runFoo(i: Int): Int = run(singleThreadContext) { blockingFoo(i) }
/**
* basically the same as [runFoo], but wrapped in a [suspendCancellableCoroutine]. Same result as [runFoo]
*/
suspend fun runFooWrapped(i: Int): Int = run(singleThreadContext) {
suspendCancellableCoroutine<Int> { cont ->
try {
cont.resume(blockingFoo(i))
} catch (e: Throwable) {
if (!cont.isCompleted) cont.resumeWithException(e)
}
}
}
/**
* has the expected behavior.
*/
suspend fun runFooExecutor(i: Int): Int = suspendCancellableCoroutine { cont ->
singleThreadExecutor.execute {
try {
cont.resume(blockingFoo(i))
} catch (e: Throwable) {
if (!cont.isCompleted) cont.resumeWithException(e)
}
}
}
/**
* Job is canceled but [blockingFoo] doesn't stop and without anyone to catch the exception crashes the program.
* This behavior may be correct but perhaps surprising
*/
suspend fun runFooAsync(i: Int): Int = async(singleThreadContext) { blockingFoo(i) }.await()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment