Skip to content

Instantly share code, notes, and snippets.

@ntoskrnl
Last active February 23, 2024 06:51
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save ntoskrnl/f0e8785939d1e4d4efe8380958affbb4 to your computer and use it in GitHub Desktop.
Save ntoskrnl/f0e8785939d1e4d4efe8380958affbb4 to your computer and use it in GitHub Desktop.
Retrofit-style JSON-RPC client in Kotlin (with gson serialization/deserialization)
fun <T, B> createJsonRpcService(service: Class<T>,
client: JsonRpcClient<B>,
resultDeserializer: Deserializer<B>,
logger: (String) -> Unit = {}): T {
val classLoader = service.classLoader
val interfaces = arrayOf<Class<*>>(service)
val invocationHandler = createInvocationHandler(service, client, resultDeserializer, logger)
@Suppress("UNCHECKED_CAST")
return Proxy.newProxyInstance(classLoader, interfaces, invocationHandler) as T
}
private fun <T, B> createInvocationHandler(service: Class<T>,
client: JsonRpcClient<B>,
resultDeserializer: Deserializer<B>,
logger: (String) -> Unit): InvocationHandler {
return object : InvocationHandler {
val requestId = AtomicLong(0)
@Throws(Throwable::class)
override fun invoke(proxy: Any, method: Method, args: Array<Any?>?): Any {
val methodAnnotation =
method.getAnnotation(JsonRpcMethod::class.java) ?: return method.invoke(this, args)
if (!method.returnsSingle) {
error("Only io.reactivex.Single<T> is supported as return type")
}
val id = requestId.incrementAndGet()
val methodName = methodAnnotation.value
val parameters = method.jsonRpcParameters(args, service)
val request = RpcRequest(id, methodName, parameters)
val returnType = method.resultGenericTypeArgument
logger("JsonRPC: Calling: $request")
return client.call(request, { result ->
logger("JsonRPC: Parsing $returnType from result=$result")
resultDeserializer.deserialize(returnType, result) ?: throw NullJsonRpcCallResultException()
})
}
}
}
private fun Method.jsonRpcParameters(args: Array<Any?>?, service: Class<*>): Map<String, Any?> {
return parameterAnnotations
.map { it?.firstOrNull { JsonRpcParam::class.java.isInstance(it) } }
.mapIndexed { i, a ->
when (a) {
is JsonRpcParam -> a.value
else -> error("Argument #$i of ${service.name}#$name()" +
" must be annotated with @${JsonRpcParam::class.java.simpleName}")
}
}
.mapIndexed { i, name -> name to args?.get(i) }
.associate { it }
}
private val Method.returnsSingle: Boolean
get() = returnType.canonicalName == Single::class.java.canonicalName
private val Method.resultGenericTypeArgument: Type
@Suppress("CAST_NEVER_SUCCEEDS")
get() = (this.genericReturnType as ParameterizedType).actualTypeArguments.first()
private class NullJsonRpcCallResultException : Exception()
interface JsonRpcClient {
fun <R> call(request: RpcRequest, responseParser: (JsonElement) -> R): Single<R>
}
class JsonRpcError(
val code: Int,
val message: String
)
class JsonRpcRequest(
val id: Long,
val method: String,
val params: Map<String, Any?> = emptyMap()
)
data class JsonRpcResponse(
val id: Long,
val result: JsonElement,
val error: JsonRpcError
)
interface RxWebSocket<T> {
fun sendMessage(message: String): Single<Unit>
fun messages(): Observable<T>
fun observeState(): Observable<RxWebSocketState>
}
internal class RxWebSocketJsonRpcClient(
private val rxWebSocket: RxWebSocket<Any>,
private val gson: Gson,
private val timeout: Long = 5000L,
private val logger: Logger,
private val schedulers: SchedulersFactory) : JsonRpcClient {
override fun <R> call(request: JsonRpcRequest, responseParser: (JsonElement) -> R): Single<R> {
val requestStr = gson.toJson(request)
logger.d(LOG_TAG, "JsonRpc request = $request")
return rxWebSocket.sendMessage(requestStr)
.flatMapSingle { responses(jsonRpcRequest.id, timeout, responseParser) }
}
private fun <T> responses(requestId: Long, timeout: Long, responseParser: (JsonElement) -> T): Single<T> {
return rxWebSocket.messages()
.observeOn(schedulers.computation())
.ofType(JsonRpcResponse::class.java)
.doOnNext { logger.d(LOG_TAG, "JsonRpc response = $it") }
.takeUntil(
rxWebSocket.observeState()
.skip(1)
.ofType(RxWebSocketState.Disconnected::class.java)
)
.switchIfEmpty(Observable.error(JsonRpcCallException(JSON_RPC_CLOSED, "WS closed or failed")))
.filter { response -> response.id == requestId }
.timeout(timeout, TimeUnit.MILLISECONDS, schedulers.computation())
.firstOrError()
.flatMap { response ->
when {
response.error != null -> {
val ex = JsonRpcCallException(response.error.code, response.error.message)
Single.error(ex)
}
else -> {
val result = responseParser(response.result)
Single.just(result)
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment