Skip to content

Instantly share code, notes, and snippets.

Last active February 23, 2024 06:51
Show Gist options
  • Save ntoskrnl/f0e8785939d1e4d4efe8380958affbb4 to your computer and use it in GitHub Desktop.
Save ntoskrnl/f0e8785939d1e4d4efe8380958affbb4 to your computer and use it in GitHub Desktop.
Retrofit-style JSON-RPC client in Kotlin (with gson serialization/deserialization)
fun <T, B> createJsonRpcService(service: Class<T>,
client: JsonRpcClient<B>,
resultDeserializer: Deserializer<B>,
logger: (String) -> Unit = {}): T {
val classLoader = service.classLoader
val interfaces = arrayOf<Class<*>>(service)
val invocationHandler = createInvocationHandler(service, client, resultDeserializer, logger)
return Proxy.newProxyInstance(classLoader, interfaces, invocationHandler) as T
private fun <T, B> createInvocationHandler(service: Class<T>,
client: JsonRpcClient<B>,
resultDeserializer: Deserializer<B>,
logger: (String) -> Unit): InvocationHandler {
return object : InvocationHandler {
val requestId = AtomicLong(0)
override fun invoke(proxy: Any, method: Method, args: Array<Any?>?): Any {
val methodAnnotation =
method.getAnnotation( ?: return method.invoke(this, args)
if (!method.returnsSingle) {
error("Only io.reactivex.Single<T> is supported as return type")
val id = requestId.incrementAndGet()
val methodName = methodAnnotation.value
val parameters = method.jsonRpcParameters(args, service)
val request = RpcRequest(id, methodName, parameters)
val returnType = method.resultGenericTypeArgument
logger("JsonRPC: Calling: $request")
return, { result ->
logger("JsonRPC: Parsing $returnType from result=$result")
resultDeserializer.deserialize(returnType, result) ?: throw NullJsonRpcCallResultException()
private fun Method.jsonRpcParameters(args: Array<Any?>?, service: Class<*>): Map<String, Any?> {
return parameterAnnotations
.map { it?.firstOrNull { } }
.mapIndexed { i, a ->
when (a) {
is JsonRpcParam -> a.value
else -> error("Argument #$i of ${}#$name()" +
" must be annotated with @${}")
.mapIndexed { i, name -> name to args?.get(i) }
.associate { it }
private val Method.returnsSingle: Boolean
get() = returnType.canonicalName ==
private val Method.resultGenericTypeArgument: Type
get() = (this.genericReturnType as ParameterizedType).actualTypeArguments.first()
private class NullJsonRpcCallResultException : Exception()
interface JsonRpcClient {
fun <R> call(request: RpcRequest, responseParser: (JsonElement) -> R): Single<R>
class JsonRpcError(
val code: Int,
val message: String
class JsonRpcRequest(
val id: Long,
val method: String,
val params: Map<String, Any?> = emptyMap()
data class JsonRpcResponse(
val id: Long,
val result: JsonElement,
val error: JsonRpcError
interface RxWebSocket<T> {
fun sendMessage(message: String): Single<Unit>
fun messages(): Observable<T>
fun observeState(): Observable<RxWebSocketState>
internal class RxWebSocketJsonRpcClient(
private val rxWebSocket: RxWebSocket<Any>,
private val gson: Gson,
private val timeout: Long = 5000L,
private val logger: Logger,
private val schedulers: SchedulersFactory) : JsonRpcClient {
override fun <R> call(request: JsonRpcRequest, responseParser: (JsonElement) -> R): Single<R> {
val requestStr = gson.toJson(request)
logger.d(LOG_TAG, "JsonRpc request = $request")
return rxWebSocket.sendMessage(requestStr)
.flatMapSingle { responses(, timeout, responseParser) }
private fun <T> responses(requestId: Long, timeout: Long, responseParser: (JsonElement) -> T): Single<T> {
return rxWebSocket.messages()
.doOnNext { logger.d(LOG_TAG, "JsonRpc response = $it") }
.switchIfEmpty(Observable.error(JsonRpcCallException(JSON_RPC_CLOSED, "WS closed or failed")))
.filter { response -> == requestId }
.timeout(timeout, TimeUnit.MILLISECONDS, schedulers.computation())
.flatMap { response ->
when {
response.error != null -> {
val ex = JsonRpcCallException(response.error.code, response.error.message)
else -> {
val result = responseParser(response.result)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment