Last active
August 29, 2015 14:24
-
-
Save edvakf/74c6e10b28b9f36dded5 to your computer and use it in GitHub Desktop.
ThriftRpc
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.ning.http.client.providers.netty.NettyResponse | |
import org.apache.thrift.protocol.TBinaryProtocol | |
import org.apache.thrift.transport.{TMemoryInputTransport, TMemoryBuffer, THttpClient} | |
import org.apache.thrift.{TException, TServiceClient, TServiceClientFactory} | |
import play.api.Play.current | |
import play.api.libs.ws.WS | |
import scala.concurrent.ExecutionContext.Implicits.global | |
import scala.concurrent.Future | |
import scala.util.control.NonFatal | |
class Rpc[A <: TServiceClient](factory: TServiceClientFactory[A], url: String) { | |
/** | |
* 例外変換のための層。Thriftのサービスを呼び出すときはこれを使うこと。 | |
* APIごとに独自の例外をキャッチしたいときは、その独自例外だけをcatchして変換するブロックをこのメソッドで包むこと | |
*/ | |
def call[B](f: A => B): B = { | |
val transport = new THttpClient(url) | |
val protocol = new TBinaryProtocol(transport) | |
val client: A = factory.getClient(protocol) | |
try { | |
transport.open() | |
f(client) | |
} catch { | |
case ex: TException => throw new RpcException("RPCから情報取得失敗", ex) | |
} finally { | |
transport.close() | |
} | |
} | |
/** | |
* PlayのWSを使ってThriftクライアントをラップする | |
* THttpClientを使うとブロッキングになるため、クライアントのsendXXXとrecvXXXを別々に呼ぶことで無理やり非同期化 | |
* TAsyncClientにHTTP版が無いのも回避 | |
*/ | |
def async[B](send: A => Unit, recv: A => B): Future[B] = { | |
val sendTransport = new TMemoryBuffer(128) // 中身はByteArrayOutputStreamなので書き込む度に大きくなる | |
val sendProtocol = new TBinaryProtocol(sendTransport) | |
val recvTransport = new TMemoryInputTransport() | |
val recvProtocol = new TBinaryProtocol(recvTransport) | |
val client: A = factory.getClient(recvProtocol, sendProtocol) | |
sendTransport.open() | |
recvTransport.open() | |
Future { | |
// ここでclientのsendXXXを呼んでsendTransportのTMemoryBufferにリクエストを書き込む | |
send(client) | |
}.recover { | |
case ex: TException => throw new RpcException("RPCリクエスト内容を生成できない", ex) | |
}.flatMap { | |
_ => | |
WS.url(url) | |
.withHeaders("Content-Type" -> "application/x-thrift") | |
.post(sendTransport.getArray) // ここでTMemoryBufferの中身をPOSTする | |
.recover { | |
case NonFatal(ex) => throw new RpcException("RPCに接続できない", ex) | |
} | |
}.map { | |
response => | |
if (response.status != 200) { | |
throw new RpcException(s"RPCのステータスコードが正しくない: ${response.status}", null) | |
} | |
// responseからstreamを得る方法もあるが、バグってるのでunderlyingを使った | |
// http://stackoverflow.com/a/22391267 | |
// WSまわりがPlay 2.4で変わるのでそのタイミングで変更してもよい | |
recvTransport.reset(response.underlying[NettyResponse].getResponseBodyAsBytes) | |
}.map { | |
_ => | |
// ここでclientのrecvXXXを呼んでrecvTransportからデータを読み込んでデコードする | |
recv(client) | |
}.recover { | |
case ex: TException => throw new RpcException("RPCのレスポンスが正しくない", ex) | |
}.andThen { | |
case _ => | |
sendTransport.close() | |
recvTransport.close() | |
} | |
} | |
} | |
class RpcException(msg: String, cause: Throwable) extends RuntimeException(msg, cause) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class MyService { | |
val client = new Rpc(new MyService.Client.Factory, "http://192.168.x.x/my_service") | |
def someMethod(input Int): String = { | |
client.call { | |
client.getSomeMethod(input) | |
} | |
} | |
def someMethodAsync(input Int): Future[String] = { | |
client.async({ | |
_.sendSomeMethod(input) | |
}, | |
{ | |
_.recvSomeMethod() | |
}) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment