Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Vertx async wrappers
import io.vertx.core.AbstractVerticle
import kotlinx.coroutines.experimental.delay
import kotlinx.coroutines.experimental.launch
class VerticleWithAsync : AbstractVerticle() {
override fun start() {
val eb = vertx.eventBus()
launch(CurrentVertx) { // requires to be inside a vertx-supplied thread
println("ayy")
delay(1000)
println("lmao") // this continuation will be executed on the verticle's context
val rainbowMessage = eb.sendAsync<String>("somewhere over the rainbow", "uh anyone here??") // suspend point
val body = rainbowMessage.body() // also executed on verticle's context
println(body)
delay(500)
rainbowMessage.reply("yes i'm here")
}
}
}
import io.vertx.core.AsyncResult
import io.vertx.core.Context
import io.vertx.core.Handler
import io.vertx.core.eventbus.EventBus
import io.vertx.core.eventbus.Message
import io.vertx.core.impl.VertxImpl
import kotlinx.coroutines.experimental.CancellableContinuation
import kotlinx.coroutines.experimental.Delay
import java.util.concurrent.TimeUnit
import kotlin.coroutines.experimental.*
class VertxContinuation<in T>(val vertxContext: Context, val cont: Continuation<T>) : Continuation<T> by cont {
override fun resume(value: T) {
vertxContext.runOnContext { cont.resume(value) }
}
override fun resumeWithException(exception: Throwable) {
vertxContext.runOnContext { cont.resumeWithException(exception) }
}
}
object CurrentVertx : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor, Delay {
val vertxContext: Context
get() = VertxImpl.context() ?: throw IllegalStateException("Can't use CurrentVertx if not in a vertx-supplied thread")
override fun <T> interceptContinuation(continuation: Continuation<T>) = VertxContinuation(vertxContext, continuation)
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
vertxContext.owner().setTimer(unit.toMillis(time)) { continuation.resume(Unit) }
}
}
inline suspend fun <T> vx(crossinline callback: (Handler<AsyncResult<T>>) -> Unit) = suspendCoroutine<T> { cont ->
callback(Handler { result: AsyncResult<T> ->
if (result.succeeded()) {
cont.resume(result.result())
} else {
cont.resumeWithException(result.cause())
}
})
}
// wrapper around message in case it's not needed to avoid calling .body() all the time
inline suspend fun <T> vxm(crossinline callback: (Handler<AsyncResult<Message<T>>>) -> Unit) = suspendCoroutine<T> { cont ->
callback(Handler { result: AsyncResult<Message<T>> ->
if (result.succeeded()) {
cont.resume(result.result().body())
} else {
cont.resumeWithException(result.cause())
}
})
}
// example of a more specific vert.x method wrapper
suspend fun <TReply> EventBus.sendAsync(address: String, obj: Any) = vx<Message<TReply>> {
send(address, obj, it)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.