Skip to content

Instantly share code, notes, and snippets.

@ntoskrnl
Last active August 17, 2019 11:22
Show Gist options
  • Save ntoskrnl/114a7b1ac9860423963eea16fe614300 to your computer and use it in GitHub Desktop.
Save ntoskrnl/114a7b1ac9860423963eea16fe614300 to your computer and use it in GitHub Desktop.
A simple JSON-RPC client over RxWebSocket
class JsonRpcClientImpl(private val rxWebSocket: RxWebSocket<String>,
private val gson: Gson) : JsonRpcClient {
override fun <R> call(request: JsonRpcRequest, responseType: Type): Single<R> {
val requestStr = gson.toJson(request)
return Single.zip(
captureResponse<R>(jsonRpcRequest.id, responseType),
rxWebSocket.sendMessage(requestStr),
{ response, _ -> response }
)
}
private fun <R> captureResponse(requestId: Long, responseType: Type): Single<R> {
return jsonRpcReqponseStream()
.filter { response -> response.id == requestId }
.firstOrError()
.flatMap { response ->
when {
response.error != null -> {
val ex = JsonRpcCallException(response.error.code, response.error.message)
Single.error(ex)
}
else -> {
val result = gson.fromJson(response.result, responseType)
Single.just(result as R)
}
}
}
}
private fun jsonRpcResponseStream(): Observable<JsonRpcResponse> {
return rxWebSocket.messages()
.observeOn(Schedulers.computation())
.map { gson.fromJson(it, JsonElement::class.java) }
.filter { isJsonRpcResponse(it) }
.map { gson.fromJson(it, JsonRpcResponse::class.java) }
}
private fun isJsonRpcResponse(message: JsonElement): Boolean {
if (!message.isJsonObject())
return false
val messageObj = message.asJsonObject
return messageObj["jsonrpc"].asString == "2.0" &&
(messageObj.has("result") || messageObj.has("error"))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment