Last active
November 5, 2015 03:49
-
-
Save jeremyrsmith/96f5cdce8d1bb719a281 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* This demonstrates creating a trivial protocol and service on top of Mux. | |
* The protocol used is scala.pickling, but it could just as easily use Scodec, | |
* Avro, or anything else that can SerDe to/from a byte array. All that would | |
* need replacing is the BinaryPickleFilter and the PickleBinaryFilter. | |
* | |
*/ | |
import com.twitter.finagle._ | |
import com.twitter.io.Buf | |
import com.twitter.util.{Await, Future} | |
import scala.pickling._, Defaults._, binary._ | |
trait BinaryFilter { | |
def extract(buf: Buf) = Buf.ByteArray.Owned.extract(buf) | |
def inject(arr: Array[Byte]) = Buf.ByteArray.Owned(arr) | |
} | |
// Filter Mux <-> Binary | |
object MuxBinary | |
extends Filter[mux.Request, mux.Response, Array[Byte], Array[Byte]] | |
with BinaryFilter { | |
def apply(muxReq: mux.Request, service: Service[Array[Byte], Array[Byte]]) = | |
service(extract(muxReq.body)) map (arr => mux.Response(inject(arr))) | |
} | |
object BinaryMux | |
extends Filter[Array[Byte], Array[Byte], mux.Request, mux.Response] | |
with BinaryFilter { | |
def apply(binReq: Array[Byte], service: Service[mux.Request, mux.Response]) = | |
service(mux.Request(Path.empty, inject(binReq))) map (rep => extract(rep.body)) | |
} | |
// Filter from Binary <- scala.pickling -> Req/Rep | |
class BinaryPickleFilter[Req, Rep](implicit upReq: Unpickler[Req], pRep: Pickler[Rep]) | |
extends Filter[Array[Byte], Array[Byte], Req, Rep] | |
{ | |
def apply(req: Array[Byte], service: Service[Req, Rep]) = | |
service(req.unpickle[Req]) map (_.pickle.value) | |
} | |
class PickleBinaryFilter[Req, Rep](implicit pReq: Pickler[Req], upRep: Unpickler[Rep]) | |
extends Filter[Req, Rep, Array[Byte], Array[Byte]] | |
{ | |
def apply(req: Req, service: Service[Array[Byte], Array[Byte]]) = | |
service(req.pickle.value) map (_.unpickle[Rep]) | |
} | |
// Some example messages | |
sealed case class MyNiftyRequest(numbers: Seq[Int], options: Map[String, String]) | |
sealed case class MyNiftyResponse(sum: Int) | |
// Example service | |
object MyService extends Service[MyNiftyRequest, MyNiftyResponse] { | |
def apply(in: MyNiftyRequest) = | |
Future(MyNiftyResponse(in.numbers.sum)) | |
} | |
// Compose Mux <-> Binary <- pickling -> MyService | |
val service = | |
MuxBinary andThen | |
new BinaryPickleFilter[MyNiftyRequest, MyNiftyResponse] andThen | |
MyService | |
// Serve using Mux server | |
Mux.server.serve(new InetSocketAddress(8901), ServiceFactory.const(service)) | |
// Use the complementary filters for the client | |
// MyNiftyRequest => PickleBinary => BinaryMux => Mux | |
val clientFactory = | |
new PickleBinaryFilter[MyNiftyRequest, MyNiftyResponse] andThen | |
BinaryMux andThen | |
Mux.newClient("localhost:8901") | |
// Call the server (in real life you wouldn't use Await) | |
println(Await.result(clientFactory.toService.apply(MyNiftyRequest(Seq(1,2,3,4), Map("foo" -> "bar"))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment