Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@jeremyrsmith
Last active November 5, 2015 03:49
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jeremyrsmith/96f5cdce8d1bb719a281 to your computer and use it in GitHub Desktop.
Save jeremyrsmith/96f5cdce8d1bb719a281 to your computer and use it in GitHub Desktop.
/**
* This demonstrates creating a trivial protocol and service on top of Mux.
* The protocol used is scala.pickling, but it could just as easily use Scodec,
* Avro, or anything else that can SerDe to/from a byte array. All that would
* need replacing is the BinaryPickleFilter and the PickleBinaryFilter.
*
*/
import com.twitter.finagle._
import com.twitter.io.Buf
import com.twitter.util.{Await, Future}
import scala.pickling._, Defaults._, binary._
trait BinaryFilter {
def extract(buf: Buf) = Buf.ByteArray.Owned.extract(buf)
def inject(arr: Array[Byte]) = Buf.ByteArray.Owned(arr)
}
// Filter Mux <-> Binary
object MuxBinary
extends Filter[mux.Request, mux.Response, Array[Byte], Array[Byte]]
with BinaryFilter {
def apply(muxReq: mux.Request, service: Service[Array[Byte], Array[Byte]]) =
service(extract(muxReq.body)) map (arr => mux.Response(inject(arr)))
}
object BinaryMux
extends Filter[Array[Byte], Array[Byte], mux.Request, mux.Response]
with BinaryFilter {
def apply(binReq: Array[Byte], service: Service[mux.Request, mux.Response]) =
service(mux.Request(Path.empty, inject(binReq))) map (rep => extract(rep.body))
}
// Filter from Binary <- scala.pickling -> Req/Rep
class BinaryPickleFilter[Req, Rep](implicit upReq: Unpickler[Req], pRep: Pickler[Rep])
extends Filter[Array[Byte], Array[Byte], Req, Rep]
{
def apply(req: Array[Byte], service: Service[Req, Rep]) =
service(req.unpickle[Req]) map (_.pickle.value)
}
class PickleBinaryFilter[Req, Rep](implicit pReq: Pickler[Req], upRep: Unpickler[Rep])
extends Filter[Req, Rep, Array[Byte], Array[Byte]]
{
def apply(req: Req, service: Service[Array[Byte], Array[Byte]]) =
service(req.pickle.value) map (_.unpickle[Rep])
}
// Some example messages
sealed case class MyNiftyRequest(numbers: Seq[Int], options: Map[String, String])
sealed case class MyNiftyResponse(sum: Int)
// Example service
object MyService extends Service[MyNiftyRequest, MyNiftyResponse] {
def apply(in: MyNiftyRequest) =
Future(MyNiftyResponse(in.numbers.sum))
}
// Compose Mux <-> Binary <- pickling -> MyService
val service =
MuxBinary andThen
new BinaryPickleFilter[MyNiftyRequest, MyNiftyResponse] andThen
MyService
// Serve using Mux server
Mux.server.serve(new InetSocketAddress(8901), ServiceFactory.const(service))
// Use the complementary filters for the client
// MyNiftyRequest => PickleBinary => BinaryMux => Mux
val clientFactory =
new PickleBinaryFilter[MyNiftyRequest, MyNiftyResponse] andThen
BinaryMux andThen
Mux.newClient("localhost:8901")
// Call the server (in real life you wouldn't use Await)
println(Await.result(clientFactory.toService.apply(MyNiftyRequest(Seq(1,2,3,4), Map("foo" -> "bar")))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment