Skip to content

Instantly share code, notes, and snippets.

@vic
Created December 9, 2021 23:54
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vic/9f6a3de334a18dee69c108d8e373ba93 to your computer and use it in GitHub Desktop.
Save vic/9f6a3de334a18dee69c108d8e373ba93 to your computer and use it in GitHub Desktop.
An Scala RPC API using Sloth (with binary Boopickle serialization) and Twitter Finagle Mux as transport.
/**
* Usage:
*
* ammonite SlothMux.sc runServer
*
* ammonite SlothMux.sc runClient
*/
import $ivy.`io.catbird::catbird-finagle:21.8.0`
import $ivy.`com.twitter::finagle-mux:21.8.0`
import $ivy.`com.github.cornerman::sloth:0.4.0`
import $ivy.`io.suzaku::boopickle:1.3.3`
import java.nio.ByteBuffer
import com.twitter.finagle.{Mux, Service, mux, Path}
import com.twitter.io.Buf
import com.twitter.util.{Await, Future}
import io.catbird.util._
import sloth._
import chameleon.ext.boopickle._
import boopickle.Default._
trait HelloService {
def hello(a: Int)(name: String): Future[String]
}
class HelloServiceImpl(greeting: String) extends HelloService {
override def hello(times: Int)(name: String): Future[String] = Future {
Range(0, times).map { _ => s"${greeting} ${name}" }.mkString("\n")
}
}
object ServerApp {
def muxReqToSlothReq(req: mux.Request): sloth.Request[ByteBuffer] = {
sloth.Request[ByteBuffer](
path = req.destination.showElems.toList,
payload = ByteBuffer.wrap(Buf.ByteArray.Owned.extract(req.body))
)
}
def byteBuffToMuxRes(req: mux.Request)(byteBuff: ByteBuffer): mux.Response = {
mux.Response(
ctxts = req.contexts,
buf = Buf.ByteArray.Owned(byteBuff.array())
)
}
def slothRouter: Router[ByteBuffer, Future] = Router[ByteBuffer, Future]
.route[HelloService](new HelloServiceImpl("Hola"))
def interpretReq(req: sloth.Request[ByteBuffer]): Future[ByteBuffer] = {
slothRouter(req).toEither match {
case Left(ServerFailure.HandlerError(err)) => Future.exception(err)
case Left(ServerFailure.DeserializerError(err)) => Future.exception(err)
case Left(ServerFailure.PathNotFound(path)) =>
Future.exception(new IllegalArgumentException(s"Path not found: ${path}"))
case Right(futureByteBuffer) => futureByteBuffer
}
}
val listening = Mux.server
.serve(
"127.0.0.1:9988",
Service.mk[mux.Request, mux.Response] { req =>
pprint.pprintln(s"GOT REQ ${req}")
Future(muxReqToSlothReq(req)).flatMap(interpretReq).map(byteBuffToMuxRes(req))
}
)
def run: Unit = {
println("Server started")
Await.result(listening)
}
}
@main
def runServer(): Unit = ServerApp.run
object ClientApp {
val service: Service[mux.Request, mux.Response] = Mux.client.newService("127.0.0.1:9988")
def slothReqToMuxReq(req: Request[ByteBuffer]): mux.Request = {
mux.Request(
dst = Path.read(req.path.mkString("/", "/", "")),
payload = Buf.ByteArray.Owned(req.payload.array())
)
}
def muxResToByteRes(res: mux.Response): ByteBuffer = {
val byteArr = Buf.ByteArray.Owned.extract(res.body)
ByteBuffer.wrap(byteArr)
}
val reqTransport = new RequestTransport[ByteBuffer, Future] {
override def apply(request: sloth.Request[ByteBuffer]): Future[ByteBuffer] =
service(slothReqToMuxReq(request)).map(muxResToByteRes)
}
val client = sloth.Client[ByteBuffer, Future, ClientException](reqTransport)
val api: HelloService = client.wire[HelloService]
def run: Unit = {
val app: Future[Unit] =
api.hello(2)("Vic").map { res =>
pprint.pprintln(s"GOT RES ${res}")
assert(res == "Hola Vic\nHola Vic")
}
Await.result(app)
}
}
@main
def runClient(): Unit = ClientApp.run
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment