Vertx async wrappers
| import io.vertx.core.AbstractVerticle | |
| import kotlinx.coroutines.experimental.delay | |
| import kotlinx.coroutines.experimental.launch | |
| class VerticleWithAsync : AbstractVerticle() { | |
| override fun start() { | |
| val eb = vertx.eventBus() | |
| launch(CurrentVertx) { // requires to be inside a vertx-supplied thread | |
| println("ayy") | |
| delay(1000) | |
| println("lmao") // this continuation will be executed on the verticle's context | |
| val rainbowMessage = eb.sendAsync<String>("somewhere over the rainbow", "uh anyone here??") // suspend point | |
| val body = rainbowMessage.body() // also executed on verticle's context | |
| println(body) | |
| delay(500) | |
| rainbowMessage.reply("yes i'm here") | |
| } | |
| } | |
| } |
| import io.vertx.core.AsyncResult | |
| import io.vertx.core.Context | |
| import io.vertx.core.Handler | |
| import io.vertx.core.eventbus.EventBus | |
| import io.vertx.core.eventbus.Message | |
| import io.vertx.core.impl.VertxImpl | |
| import kotlinx.coroutines.experimental.CancellableContinuation | |
| import kotlinx.coroutines.experimental.Delay | |
| import java.util.concurrent.TimeUnit | |
| import kotlin.coroutines.experimental.* | |
| class VertxContinuation<in T>(val vertxContext: Context, val cont: Continuation<T>) : Continuation<T> by cont { | |
| override fun resume(value: T) { | |
| vertxContext.runOnContext { cont.resume(value) } | |
| } | |
| override fun resumeWithException(exception: Throwable) { | |
| vertxContext.runOnContext { cont.resumeWithException(exception) } | |
| } | |
| } | |
| object CurrentVertx : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor, Delay { | |
| val vertxContext: Context | |
| get() = VertxImpl.context() ?: throw IllegalStateException("Can't use CurrentVertx if not in a vertx-supplied thread") | |
| override fun <T> interceptContinuation(continuation: Continuation<T>) = VertxContinuation(vertxContext, continuation) | |
| override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) { | |
| vertxContext.owner().setTimer(unit.toMillis(time)) { continuation.resume(Unit) } | |
| } | |
| } | |
| inline suspend fun <T> vx(crossinline callback: (Handler<AsyncResult<T>>) -> Unit) = suspendCoroutine<T> { cont -> | |
| callback(Handler { result: AsyncResult<T> -> | |
| if (result.succeeded()) { | |
| cont.resume(result.result()) | |
| } else { | |
| cont.resumeWithException(result.cause()) | |
| } | |
| }) | |
| } | |
| // wrapper around message in case it's not needed to avoid calling .body() all the time | |
| inline suspend fun <T> vxm(crossinline callback: (Handler<AsyncResult<Message<T>>>) -> Unit) = suspendCoroutine<T> { cont -> | |
| callback(Handler { result: AsyncResult<Message<T>> -> | |
| if (result.succeeded()) { | |
| cont.resume(result.result().body()) | |
| } else { | |
| cont.resumeWithException(result.cause()) | |
| } | |
| }) | |
| } | |
| // example of a more specific vert.x method wrapper | |
| suspend fun <TReply> EventBus.sendAsync(address: String, obj: Any) = vx<Message<TReply>> { | |
| send(address, obj, it) | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This comment has been minimized.
Minikloon commentedMar 24, 2017
Based on https://github.com/Kotlin/kotlin-coroutines/blob/master/kotlin-coroutines-informal.md#wrapping-callbacks