Skip to content

Instantly share code, notes, and snippets.

@elizarov
Last active November 11, 2016 16:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save elizarov/3bccee582ecb54d12850f75122dc7a0f to your computer and use it in GitHub Desktop.
Save elizarov/3bccee582ecb54d12850f75122dc7a0f to your computer and use it in GitHub Desktop.
Kotlin coroutines playground
import java.io.InputStream
import java.nio.ByteBuffer
import java.nio.channels.AsynchronousFileChannel
import java.nio.channels.CompletionHandler
import java.nio.file.Path
import java.nio.file.Paths
import java.nio.file.StandardOpenOption.CREATE
import java.nio.file.StandardOpenOption.WRITE
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage
import java.util.concurrent.Executor
import java.util.concurrent.ForkJoinPool
import java.util.function.BiConsumer
import javax.swing.SwingUtilities
// ===================================================================================================
// modified async / asyncUI / await samples
// starts execution until the first suspension in this thread, then returns
fun <T> async(coroutine capture: FutureController<T>.() -> Continuation<Unit>): CompletableFuture<T> {
// run until suspension right here, then use background threads provided by suspension functions
return FutureController<T>().apply { capture().resume(Unit) }.future
}
// moves execution into specified thread pool and returns immediately with the future
fun <T> fork(executor: Executor = ForkJoinPool.commonPool(),
coroutine capture: FutureController<T>.() -> Continuation<Unit>): CompletableFuture<T> {
val controller = FutureController<T>(executor)
val machine = controller.capture() // capture all variables from outer scope while in this thread
executor.execute { machine.resume(Unit) } // then move all execution into background thread
return controller.future
}
// in UI thread -> works like async -- runs here until suspension point
// NOT in UI thread -> works like fork -- moves execution into UI thread and returns immediately
fun <T> asyncUI(coroutine capture: FutureController<T>.() -> Continuation<Unit>): CompletableFuture<T> {
val swingExecutor = Executor { command -> SwingUtilities.invokeLater(command) }
if (SwingUtilities.isEventDispatchThread()) {
// run until suspension right here like async
return FutureController<T>(swingExecutor).apply { capture().resume(Unit) }.future
} else
return fork(swingExecutor, capture)
}
@AllowSuspendExtensions
class FutureController<T>(private val executor: Executor? = null) : AsynchronousController {
val future = CompletableFuture<T>()
// note: both handleResult & handleException do not actually use continuation
operator fun handleResult(value: T, c: Continuation<Nothing>) {
future.complete(value)
}
operator fun handleException(exception: Throwable, c: Continuation<Nothing>) {
future.completeExceptionally(exception)
}
// note -- this 'await' function uses controller only for 'executor'
suspend fun <V> await(f: CompletionStage<V>, machine: Continuation<V>) {
val action = BiConsumer<V, Throwable> { value, exception ->
if (exception == null)
machine.resume(value)
else
machine.resumeWithException(exception)
}
if (executor == null)
f.whenComplete(action)
else
f.whenCompleteAsync(action, executor)
}
// this will fix returning to UI thread
/*operator*/ override fun interceptRun(block: () -> Unit) {
executor?.execute { block() } ?: block()
}
}
interface ControllerWithExecutor {
val executor: Executor
}
interface AsynchronousController {
fun interceptRun(block: () -> Unit)
}
// ===================================================================================================
// extension for channels
// note, that this suspend function does not use controller at all
// it can be applied to any controller at all, but there is a bug: in `asyncUI` it resumes in wrong thread
suspend fun FutureController<*>.asyncWriteToFile(
path: Path,
text: String,
machine: Continuation<Unit>
) {
log("Writing to file $path")
val channel = AsynchronousFileChannel.open(path, CREATE, WRITE)
val buf = ByteBuffer.wrap(text.toByteArray())
val handler = object : CompletionHandler<Int, Unit> {
override fun completed(result: Int, attachment: Unit) {
log("Completed writing to file $result bytes")
channel.close()
machine.resume(Unit)
}
override fun failed(exception: Throwable, attachment: Unit) {
log("Failed writing to file with $exception")
channel.close()
machine.resumeWithException(exception)
}
}
channel.write(buf, 0, Unit, handler)
}
// ===================================================================================================
// generate / yield example
fun <T> generate(coroutine capture: GenerateController<T>.() -> Continuation<Unit>) = object : Sequence<T> {
override fun iterator(): Iterator<T> {
val controller = GenerateController<T>() // implements Iterator<T>
controller.setNextStep(controller.capture()) // capture scope, but don't run continuation yet
return controller
}
}
@AllowSuspendExtensions
class GenerateController<T> internal constructor() : AbstractIterator<T>() {
private lateinit var nextStep: Continuation<Unit>
override fun computeNext() {
nextStep.resume(Unit)
}
internal fun setNextStep(step: Continuation<Unit>) {
nextStep = step
}
suspend fun yield(value: T, c: Continuation<Unit>) {
setNext(value)
setNextStep(c)
}
operator fun handleResult(result: Unit, c: Continuation<Nothing>) {
done()
}
}
annotation class SynchronousController
@SynchronousController
interface GeneratingController<in T> {
suspend fun yield(value: T, c: Continuation<Unit>)
}
suspend fun <T, R> GenerateController<R>.runAndYield(block: () -> R, c: Continuation<Unit>) {
yield(block(), c)
}
// ===================================================================================================
// streamBytes / produceBytes example
fun streamBytes(coroutine capture: StreamBytesController.() -> Continuation<Unit>): InputStream {
return StreamBytesController().apply { setNextStep(capture()) } // implements InputStream
}
class StreamBytesController : InputStream() {
lateinit var nextStep: Continuation<Unit>
var bytes = ByteArray(0)
var index = 0
suspend fun yieldBytes(value: ByteArray, c: Continuation<Unit>) {
bytes = value
index = 0
nextStep = c
}
internal fun setNextStep(step: Continuation<Unit>) {
nextStep = step
}
override fun read(): Int {
while (index >= bytes.size)
nextStep.resume(Unit)
return bytes[index++].toInt()
}
}
// ===================================================================================================
// coroutines with stack
@AllowSuspendExtensions
class Suspendable<T>(internal coroutine val capture: Suspendable<T>.() -> Continuation<Unit>) {
internal lateinit var caller: Continuation<T>
// note: both handleResult & handleException do not actually use continuation
operator fun handleResult(value: T, c: Continuation<Nothing>) {
caller.resume(value)
}
operator fun handleException(exception: Throwable, c: Continuation<Nothing>) {
caller.resumeWithException(exception)
}
}
// note, that this suspend function does not use controller at all, it can be applied to any controller
suspend fun <T> FutureController<*>.invoke(suspendable: Suspendable<T>, caller: Continuation<T>) {
suspendable.caller = caller
val machine = suspendable.capture
machine(suspendable).resume(Unit) // start invocation until suspension point
}
// ===================================================================================================
// utility code
fun log(msg: String) {
println("${Instant.now()} [${Thread.currentThread().name}] $msg")
}
fun sleep(millis: Long) {
Thread.sleep(millis)
}
// ===================================================================================================
// main code to test it
// some background task that does long computation
fun computeTheAnswer(): CompletableFuture<Int> = fork {
log("Start computing the answer")
sleep(1000)
log("Done computing the answer")
42
}
// some background task the processed the answer
fun processTheAnswer(answer: Int): CompletableFuture<String> = fork {
log("Start processing the answer")
sleep(1000)
val result = "'the answer was $answer'"
log("Done processing the answer")
result
}
// some suspendable computation _in_the_same_thread_
fun mangleTheResultSubroutine(result: String) = Suspendable<String> {
log("Start mangling the result in subroutine")
sleep(1000)
val mangled = StringBuilder(result).apply { reverse() }.toString()
log("Done mangling the result in subroutine")
mangled
}
fun main(args: Array<String>) {
log("Started")
val bg = fork<String> {
log("Background computation started in fork")
sleep(500) // prepare the question
val answer = await(computeTheAnswer())
log("Background computation got the answer $answer")
sleep(500) // reflect upon the answer
val result = await(processTheAnswer(answer))
log("Background computation got the result $result")
sleep(500) // appreciate the result
log("Background computation finished")
result
}
log("Forked, waiting for background computation")
val result = bg.get()
log("Background result was: $result")
asyncUI<Unit> {
log("Display $result in UI")
val mangled = invoke(mangleTheResultSubroutine(result))
log("Back to UI with mangled $mangled")
asyncWriteToFile(Paths.get("asyncIOtest.tmp"), mangled)
log("Are we back to UI thread?")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment