Skip to content

Instantly share code, notes, and snippets.

@terrybleger
Last active April 5, 2020 17:43
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save terrybleger/14aa13917d47f5bd8d6c to your computer and use it in GitHub Desktop.
Save terrybleger/14aa13917d47f5bd8d6c to your computer and use it in GitHub Desktop.
Vertx - Quasar - Kotlin
package util.func
import co.paralleluniverse.fibers.*
import co.paralleluniverse.strands.SuspendableCallable
import co.paralleluniverse.strands.SuspendableRunnable
import io.vertx.core.Context
import io.vertx.core.Future
import io.vertx.core.Vertx
import java.util.concurrent.Executor
Suspendable
fun async(block: AsyncTask.() -> Unit) {
AsyncTask().block()
}
Suspendable
fun async(vertx: Vertx, block: VertxAsyncTask.() -> Unit) {
VertxAsyncTask(vertx).block()
}
Suspendable
fun yield<T : Any?>(future: Future<T>): T {
return Fiber(FutureCallable(future)).start().get()
}
Suspendable
fun yield<T : Any?>(vertx: Vertx, future: Future<T>): T {
return FiberFactory.create(vertx, future).start().get()
}
Suspendable
fun block(vertx: Vertx, callback: () -> Unit) {
FiberFactory.create(vertx, SuspendableRunner { callback() }).start()
}
open class AsyncTask {
Suspendable
open fun await<T : Any?>(future: Future<T>): T {
return Fiber(FutureCallable(future)).start().get()
}
}
class VertxAsyncTask(val vertx: Vertx) : AsyncTask() {
Suspendable
override fun await<T : Any?>(future: Future<T>): T {
return FiberFactory.create(vertx, FutureCallable(future)).start().get()
}
}
class ContextExecutor(val context: Context) : Executor {
override fun execute(command: Runnable) {
context.runOnContext { command.run() }
}
}
class FiberFactory {
companion object {
private fun createScheduler(vertx: Vertx): FiberScheduler {
return FiberExecutorScheduler("vertx", ContextExecutor(vertx.getOrCreateContext()))
}
fun create<T : Any?>(vertx: Vertx, future: Future<T>): Fiber<T> {
return create(vertx, FutureCallable(future))
}
fun create<T : Any?>(vertx: Vertx, callable: FutureCallable<T>): Fiber<T> {
return FutureFiber(createScheduler(vertx), callable)
}
fun create(vertx: Vertx, runner: SuspendableRunner): Fiber<Unit> {
return Fiber(createScheduler(vertx), runner)
}
}
}
class FutureFiber<T : Any?>(scheduler: FiberScheduler, callable: FutureCallable<T>) : Fiber<T>(scheduler, callable)
class FutureCallable<T : Any?>(private val future: Future<T>) : SuspendableCallable<T> {
Suspendable
override fun run(): T {
return AsyncHandler(future).run()
}
}
class AsyncHandler<T : Any?>(private val future: Future<T>) : FiberAsync<T, Throwable>() {
override fun requestAsync() {
future.setHandler {
if (it.succeeded()) asyncCompleted(it.result())
else asyncFailed(it.cause())
}
}
}
class SuspendableRunner(private val callback: SuspendableRunner.() -> Unit) : SuspendableRunnable {
Suspendable
override fun run() {
callback()
}
}
import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.dataflow.Var
import util.func.async
import util.func.block
import util.func.yield
import io.vertx.core.Vertx
fun main(args: Array<String>) {
val vertx = Vertx.vertx()
// every function that is blocking must be ran from inside this one.
// and every function that is blocking must a @Suspendable
block(vertx) { example(vertx) }
}
Suspendable
fun example(vertx: Vertx) {
// testing vars
val v = Var<Int>()
vertx.setTimer(1000, { v.set(1) })
println(v.get())
// testing async
async {
println( await(simple(1)) )
println( await(simple(2)) )
println( await(simple(3)) )
}
// testing yield
println( yield(simple(1)) )
println( yield(simple(2)) )
println( yield(simple(3)) )
}
fun simple<T: Any?>(value: T): Future<T> {
val future = Future.future<T>()
future.complete(value)
return future
}
# add this to Run/Debug Configurations inside VM Options:
-javaagent:libs/quasar-core-0.7.2-jdk8.jar -Dco.paralleluniverse.fibers.verifyInstrumentation=true
# set verifyInstrumentation=true only when development
# this helps you to know when to put @Suspendable on your functions
-Dco.paralleluniverse.fibers.verifyInstrumentation=true
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment