Skip to content

Instantly share code, notes, and snippets.

@edvakf
Last active August 29, 2015 14:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save edvakf/74c6e10b28b9f36dded5 to your computer and use it in GitHub Desktop.
Save edvakf/74c6e10b28b9f36dded5 to your computer and use it in GitHub Desktop.
ThriftRpc
import com.ning.http.client.providers.netty.NettyResponse
import org.apache.thrift.protocol.TBinaryProtocol
import org.apache.thrift.transport.{TMemoryInputTransport, TMemoryBuffer, THttpClient}
import org.apache.thrift.{TException, TServiceClient, TServiceClientFactory}
import play.api.Play.current
import play.api.libs.ws.WS
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.control.NonFatal
class Rpc[A <: TServiceClient](factory: TServiceClientFactory[A], url: String) {
/**
* 例外変換のための層。Thriftのサービスを呼び出すときはこれを使うこと。
* APIごとに独自の例外をキャッチしたいときは、その独自例外だけをcatchして変換するブロックをこのメソッドで包むこと
*/
def call[B](f: A => B): B = {
val transport = new THttpClient(url)
val protocol = new TBinaryProtocol(transport)
val client: A = factory.getClient(protocol)
try {
transport.open()
f(client)
} catch {
case ex: TException => throw new RpcException("RPCから情報取得失敗", ex)
} finally {
transport.close()
}
}
/**
* PlayのWSを使ってThriftクライアントをラップする
* THttpClientを使うとブロッキングになるため、クライアントのsendXXXとrecvXXXを別々に呼ぶことで無理やり非同期化
* TAsyncClientにHTTP版が無いのも回避
*/
def async[B](send: A => Unit, recv: A => B): Future[B] = {
val sendTransport = new TMemoryBuffer(128) // 中身はByteArrayOutputStreamなので書き込む度に大きくなる
val sendProtocol = new TBinaryProtocol(sendTransport)
val recvTransport = new TMemoryInputTransport()
val recvProtocol = new TBinaryProtocol(recvTransport)
val client: A = factory.getClient(recvProtocol, sendProtocol)
sendTransport.open()
recvTransport.open()
Future {
// ここでclientのsendXXXを呼んでsendTransportのTMemoryBufferにリクエストを書き込む
send(client)
}.recover {
case ex: TException => throw new RpcException("RPCリクエスト内容を生成できない", ex)
}.flatMap {
_ =>
WS.url(url)
.withHeaders("Content-Type" -> "application/x-thrift")
.post(sendTransport.getArray) // ここでTMemoryBufferの中身をPOSTする
.recover {
case NonFatal(ex) => throw new RpcException("RPCに接続できない", ex)
}
}.map {
response =>
if (response.status != 200) {
throw new RpcException(s"RPCのステータスコードが正しくない: ${response.status}", null)
}
// responseからstreamを得る方法もあるが、バグってるのでunderlyingを使った
// http://stackoverflow.com/a/22391267
// WSまわりがPlay 2.4で変わるのでそのタイミングで変更してもよい
recvTransport.reset(response.underlying[NettyResponse].getResponseBodyAsBytes)
}.map {
_ =>
// ここでclientのrecvXXXを呼んでrecvTransportからデータを読み込んでデコードする
recv(client)
}.recover {
case ex: TException => throw new RpcException("RPCのレスポンスが正しくない", ex)
}.andThen {
case _ =>
sendTransport.close()
recvTransport.close()
}
}
}
class RpcException(msg: String, cause: Throwable) extends RuntimeException(msg, cause)
class MyService {
val client = new Rpc(new MyService.Client.Factory, "http://192.168.x.x/my_service")
def someMethod(input Int): String = {
client.call {
client.getSomeMethod(input)
}
}
def someMethodAsync(input Int): Future[String] = {
client.async({
_.sendSomeMethod(input)
},
{
_.recvSomeMethod()
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment