Last active
November 11, 2016 16:09
-
-
Save elizarov/3bccee582ecb54d12850f75122dc7a0f to your computer and use it in GitHub Desktop.
Kotlin coroutines playground
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.InputStream | |
import java.nio.ByteBuffer | |
import java.nio.channels.AsynchronousFileChannel | |
import java.nio.channels.CompletionHandler | |
import java.nio.file.Path | |
import java.nio.file.Paths | |
import java.nio.file.StandardOpenOption.CREATE | |
import java.nio.file.StandardOpenOption.WRITE | |
import java.time.Instant | |
import java.util.concurrent.CompletableFuture | |
import java.util.concurrent.CompletionStage | |
import java.util.concurrent.Executor | |
import java.util.concurrent.ForkJoinPool | |
import java.util.function.BiConsumer | |
import javax.swing.SwingUtilities | |
// =================================================================================================== | |
// modified async / asyncUI / await samples | |
// starts execution until the first suspension in this thread, then returns | |
fun <T> async(coroutine capture: FutureController<T>.() -> Continuation<Unit>): CompletableFuture<T> { | |
// run until suspension right here, then use background threads provided by suspension functions | |
return FutureController<T>().apply { capture().resume(Unit) }.future | |
} | |
// moves execution into specified thread pool and returns immediately with the future | |
fun <T> fork(executor: Executor = ForkJoinPool.commonPool(), | |
coroutine capture: FutureController<T>.() -> Continuation<Unit>): CompletableFuture<T> { | |
val controller = FutureController<T>(executor) | |
val machine = controller.capture() // capture all variables from outer scope while in this thread | |
executor.execute { machine.resume(Unit) } // then move all execution into background thread | |
return controller.future | |
} | |
// in UI thread -> works like async -- runs here until suspension point | |
// NOT in UI thread -> works like fork -- moves execution into UI thread and returns immediately | |
fun <T> asyncUI(coroutine capture: FutureController<T>.() -> Continuation<Unit>): CompletableFuture<T> { | |
val swingExecutor = Executor { command -> SwingUtilities.invokeLater(command) } | |
if (SwingUtilities.isEventDispatchThread()) { | |
// run until suspension right here like async | |
return FutureController<T>(swingExecutor).apply { capture().resume(Unit) }.future | |
} else | |
return fork(swingExecutor, capture) | |
} | |
@AllowSuspendExtensions | |
class FutureController<T>(private val executor: Executor? = null) : AsynchronousController { | |
val future = CompletableFuture<T>() | |
// note: both handleResult & handleException do not actually use continuation | |
operator fun handleResult(value: T, c: Continuation<Nothing>) { | |
future.complete(value) | |
} | |
operator fun handleException(exception: Throwable, c: Continuation<Nothing>) { | |
future.completeExceptionally(exception) | |
} | |
// note -- this 'await' function uses controller only for 'executor' | |
suspend fun <V> await(f: CompletionStage<V>, machine: Continuation<V>) { | |
val action = BiConsumer<V, Throwable> { value, exception -> | |
if (exception == null) | |
machine.resume(value) | |
else | |
machine.resumeWithException(exception) | |
} | |
if (executor == null) | |
f.whenComplete(action) | |
else | |
f.whenCompleteAsync(action, executor) | |
} | |
// this will fix returning to UI thread | |
/*operator*/ override fun interceptRun(block: () -> Unit) { | |
executor?.execute { block() } ?: block() | |
} | |
} | |
interface ControllerWithExecutor { | |
val executor: Executor | |
} | |
interface AsynchronousController { | |
fun interceptRun(block: () -> Unit) | |
} | |
// =================================================================================================== | |
// extension for channels | |
// note, that this suspend function does not use controller at all | |
// it can be applied to any controller at all, but there is a bug: in `asyncUI` it resumes in wrong thread | |
suspend fun FutureController<*>.asyncWriteToFile( | |
path: Path, | |
text: String, | |
machine: Continuation<Unit> | |
) { | |
log("Writing to file $path") | |
val channel = AsynchronousFileChannel.open(path, CREATE, WRITE) | |
val buf = ByteBuffer.wrap(text.toByteArray()) | |
val handler = object : CompletionHandler<Int, Unit> { | |
override fun completed(result: Int, attachment: Unit) { | |
log("Completed writing to file $result bytes") | |
channel.close() | |
machine.resume(Unit) | |
} | |
override fun failed(exception: Throwable, attachment: Unit) { | |
log("Failed writing to file with $exception") | |
channel.close() | |
machine.resumeWithException(exception) | |
} | |
} | |
channel.write(buf, 0, Unit, handler) | |
} | |
// =================================================================================================== | |
// generate / yield example | |
fun <T> generate(coroutine capture: GenerateController<T>.() -> Continuation<Unit>) = object : Sequence<T> { | |
override fun iterator(): Iterator<T> { | |
val controller = GenerateController<T>() // implements Iterator<T> | |
controller.setNextStep(controller.capture()) // capture scope, but don't run continuation yet | |
return controller | |
} | |
} | |
@AllowSuspendExtensions | |
class GenerateController<T> internal constructor() : AbstractIterator<T>() { | |
private lateinit var nextStep: Continuation<Unit> | |
override fun computeNext() { | |
nextStep.resume(Unit) | |
} | |
internal fun setNextStep(step: Continuation<Unit>) { | |
nextStep = step | |
} | |
suspend fun yield(value: T, c: Continuation<Unit>) { | |
setNext(value) | |
setNextStep(c) | |
} | |
operator fun handleResult(result: Unit, c: Continuation<Nothing>) { | |
done() | |
} | |
} | |
annotation class SynchronousController | |
@SynchronousController | |
interface GeneratingController<in T> { | |
suspend fun yield(value: T, c: Continuation<Unit>) | |
} | |
suspend fun <T, R> GenerateController<R>.runAndYield(block: () -> R, c: Continuation<Unit>) { | |
yield(block(), c) | |
} | |
// =================================================================================================== | |
// streamBytes / produceBytes example | |
fun streamBytes(coroutine capture: StreamBytesController.() -> Continuation<Unit>): InputStream { | |
return StreamBytesController().apply { setNextStep(capture()) } // implements InputStream | |
} | |
class StreamBytesController : InputStream() { | |
lateinit var nextStep: Continuation<Unit> | |
var bytes = ByteArray(0) | |
var index = 0 | |
suspend fun yieldBytes(value: ByteArray, c: Continuation<Unit>) { | |
bytes = value | |
index = 0 | |
nextStep = c | |
} | |
internal fun setNextStep(step: Continuation<Unit>) { | |
nextStep = step | |
} | |
override fun read(): Int { | |
while (index >= bytes.size) | |
nextStep.resume(Unit) | |
return bytes[index++].toInt() | |
} | |
} | |
// =================================================================================================== | |
// coroutines with stack | |
@AllowSuspendExtensions | |
class Suspendable<T>(internal coroutine val capture: Suspendable<T>.() -> Continuation<Unit>) { | |
internal lateinit var caller: Continuation<T> | |
// note: both handleResult & handleException do not actually use continuation | |
operator fun handleResult(value: T, c: Continuation<Nothing>) { | |
caller.resume(value) | |
} | |
operator fun handleException(exception: Throwable, c: Continuation<Nothing>) { | |
caller.resumeWithException(exception) | |
} | |
} | |
// note, that this suspend function does not use controller at all, it can be applied to any controller | |
suspend fun <T> FutureController<*>.invoke(suspendable: Suspendable<T>, caller: Continuation<T>) { | |
suspendable.caller = caller | |
val machine = suspendable.capture | |
machine(suspendable).resume(Unit) // start invocation until suspension point | |
} | |
// =================================================================================================== | |
// utility code | |
fun log(msg: String) { | |
println("${Instant.now()} [${Thread.currentThread().name}] $msg") | |
} | |
fun sleep(millis: Long) { | |
Thread.sleep(millis) | |
} | |
// =================================================================================================== | |
// main code to test it | |
// some background task that does long computation | |
fun computeTheAnswer(): CompletableFuture<Int> = fork { | |
log("Start computing the answer") | |
sleep(1000) | |
log("Done computing the answer") | |
42 | |
} | |
// some background task the processed the answer | |
fun processTheAnswer(answer: Int): CompletableFuture<String> = fork { | |
log("Start processing the answer") | |
sleep(1000) | |
val result = "'the answer was $answer'" | |
log("Done processing the answer") | |
result | |
} | |
// some suspendable computation _in_the_same_thread_ | |
fun mangleTheResultSubroutine(result: String) = Suspendable<String> { | |
log("Start mangling the result in subroutine") | |
sleep(1000) | |
val mangled = StringBuilder(result).apply { reverse() }.toString() | |
log("Done mangling the result in subroutine") | |
mangled | |
} | |
fun main(args: Array<String>) { | |
log("Started") | |
val bg = fork<String> { | |
log("Background computation started in fork") | |
sleep(500) // prepare the question | |
val answer = await(computeTheAnswer()) | |
log("Background computation got the answer $answer") | |
sleep(500) // reflect upon the answer | |
val result = await(processTheAnswer(answer)) | |
log("Background computation got the result $result") | |
sleep(500) // appreciate the result | |
log("Background computation finished") | |
result | |
} | |
log("Forked, waiting for background computation") | |
val result = bg.get() | |
log("Background result was: $result") | |
asyncUI<Unit> { | |
log("Display $result in UI") | |
val mangled = invoke(mangleTheResultSubroutine(result)) | |
log("Back to UI with mangled $mangled") | |
asyncWriteToFile(Paths.get("asyncIOtest.tmp"), mangled) | |
log("Are we back to UI thread?") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment