Skip to content

Instantly share code, notes, and snippets.

@windoze
Created January 26, 2019 19:02
Show Gist options
  • Save windoze/2bcdc7d10f3604c68eae0b26a5ee6feb to your computer and use it in GitHub Desktop.
Save windoze/2bcdc7d10f3604c68eae0b26a5ee6feb to your computer and use it in GitHub Desktop.
Simple implementation of RPC infrastructure over Vertx EventBus
import com.fasterxml.jackson.core.type.TypeReference
import io.vertx.core.AsyncResult
import io.vertx.core.Handler
import io.vertx.core.Vertx
import io.vertx.core.eventbus.Message
import io.vertx.core.json.Json
import io.vertx.core.logging.Logger
import io.vertx.core.logging.LoggerFactory
import io.vertx.kotlin.coroutines.CoroutineVerticle
import io.vertx.kotlin.coroutines.dispatcher
import io.vertx.kotlin.coroutines.toChannel
import kotlinx.coroutines.launch
import java.lang.reflect.Proxy
import kotlin.coroutines.Continuation
import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.reflect.full.callSuspend
fun Any.stringify(): String = Json.encode(this)
@Suppress("ArrayInDataClass")
data class RpcRequest(
val service: String = "",
val method: String = "",
val args: Array<out Any>? = null
)
data class RpcResponse(
val response: Any? = null
)
/**
* RpcServerVerticle hosts all RPC service objects
*/
class RpcServerVerticle(private val channel: String) : CoroutineVerticle() {
private interface RpcServer {
suspend fun processRequest(request: RpcRequest): RpcResponse
companion object {
fun <T : Any> instance(impl: T): RpcServer {
return object : RpcServer {
override suspend fun processRequest(request: RpcRequest): RpcResponse {
val ret = impl::class.members.first {
// TODO: Check signature to support overloading
it.name == request.method
}.callSuspend(impl, *(request.args ?: arrayOf()))
return RpcResponse(ret)
}
}
}
}
}
private val services: HashMap<String, RpcServer> = hashMapOf()
override suspend fun start() {
launch(vertx.dispatcher()) {
for (msg in vertx.eventBus().consumer<String>(channel).toChannel(vertx)) {
// Start a new coroutine to handle the incoming request to support recursive call
launch(vertx.dispatcher()) {
val req = Json.decodeValue(msg.body(), RpcRequest::class.java)
try {
msg.reply((services[req.service]?.processRequest(req)
?: throw NoSuchElementException("Service ${req.service} not found")).stringify())
} catch (e: Throwable) {
msg.fail(1, e.message)
}
}
}
}
}
/**
* Register the service object
*/
fun <T : Any> register(name: String, impl: T): RpcServerVerticle {
services[name] = RpcServer.instance(impl)
return this
}
}
/**
* Dynamically create the service proxy object for the given interface
*/
inline fun <reified T : Any> getServiceProxy(vertx: Vertx, channel: String, name: String) =
Proxy.newProxyInstance(T::class.java.classLoader, arrayOf(T::class.java)) { _, method, args ->
val lastArg = args?.lastOrNull()
if (lastArg is Continuation<*>) {
// The last argument of a suspend function is the Continuation object
@Suppress("UNCHECKED_CAST") val cont = lastArg as Continuation<Any?>
val argsButLast = args.take(args.size - 1)
// Send request to the given channel on the event bus
vertx.eventBus().send(channel, RpcRequest(name,
method.name,
argsButLast.toTypedArray()).stringify(),
Handler<AsyncResult<Message<String>>> { event ->
// Resume the suspended coroutine on reply
if (event?.succeeded() == true) {
cont.resume(Json.decodeValue(event.result().body(),
object : TypeReference<RpcResponse>() {}).response)
} else {
cont.resumeWithException(event?.cause() ?: Exception("Unknown error"))
}
})
// Suspend the coroutine to wait for the reply
COROUTINE_SUSPENDED
} else {
// The function is not suspend
null
}
} as T
interface FooBarSvc {
suspend fun foo(a: Int, b: String): String
suspend fun bar(x: String): Int
}
interface HelloSvc {
suspend fun hello(world: String): String
}
class TestRpcClientVerticle : CoroutineVerticle() {
private val log: Logger = LoggerFactory.getLogger(this.javaClass)
private val channel = "test-channel"
override suspend fun start() {
val svc: FooBarSvc = getServiceProxy(vertx, channel, "foobar")
log.info("Received string is '${svc.foo(42, "world")}'.")
vertx.close()
}
}
fun main() {
val vertx = Vertx.vertx()
// Implementation needs not to implement the service interface
class HelloSvcImpl {
// Method needs not to be suspend
fun hello(name: String): String = "Hello, $name!"
}
class FBSvcImpl : FooBarSvc {
val svc: HelloSvc = getServiceProxy(vertx, "test-channel", "hello")
override suspend fun foo(a: Int, b: String): String = "$a $b, ${svc.hello("world")}"
override suspend fun bar(x: String): Int = x.toInt()
}
vertx.deployVerticle(RpcServerVerticle("test-channel")
.register("hello", HelloSvcImpl())
.register("foobar", FBSvcImpl()))
vertx.deployVerticle(TestRpcClientVerticle())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment