Skip to content

Instantly share code, notes, and snippets.

@kopos
Last active December 26, 2015 07:59
Show Gist options
  • Save kopos/7119476 to your computer and use it in GitHub Desktop.
Save kopos/7119476 to your computer and use it in GitHub Desktop.
Trying to grok funsvr.pdf
/*
* Combinators in Finagle:
*
* - flatMap
* - rescue
* - collect
* - andThen
* -
*/
/*
* Dependent Composition
* Version 1 - Synchronous services - Dependent Composition
*/
def rewrite(user: String, query: String): String
def search(query: String): Set[Result]
def psearch(user: String, query: String): Set[Result]
result = rewrite(user, query)
return search(result)
/*
* Version 1.5 - Boiler-plate pseudocode for services
*/
def rewrite(user: String, query: String): Future[String]
def search(query: String): Future[Set[Result]]
//
// lol - looks like elixir here!
//
def psearch(user: String, query: String) =
rewrite(user, query) |> search(_)
//
// flatMap colmbinator - to solve the data dependency between
// search() and rewrite()
//
trait Future[T] {
def flatMap[U] (f: T => Future[U]): Future[U]
...
}
/*
* Version 1.8 - Service composition with flatMap
*/
def psearch(user: String, query: String) =
rewrite(user, query) flatMap { pquery => search(pquery) }
val result: Future[Set[Result]] = psearch("darius", "twitter")
//
// rescue combinator - to handle partial methods
//
trait Future[T] {
...
def rescue[B] (f: PartialFunction[Throwable, Future[B]]): Future[B]
}
/*
* Dependent composition with Error Handling
* Version 2.0 - Service composition with flatMap and rescue
*/
def psearch(user: String, query: String) =
rewrite(user, query).within(50.milliseconds) rescue {
case _: TimeoutError => Future.value(query)
} flatMap { pquery => search(pquery) }
/*
* Composing Multiple dependencies - Map Reduce
*/
def collect[A](fs: Seq[Future[A]]): Future[Seq[A]]
def querySegment(id: Int, query: String): [Set[Result]]
def search(query: String): Future[Set[Result]] = {
val queries: Seq[Future[Result]] =
for (id <- 0 until NumSegments) yield {
querySegment(id, query)
}
collect(queries) flatMap { results: Seq[Set[Result]] =>
Future.value(results.flatten.toSet)
}
}
/*
* Recursive Composition
*/
def permute(query: String): String
def rsearch(user: String, query: String, results: Set[Results], n: Int): Future[Set[Results]] =
if (results.size >= n)
Future.value(results)
else {
val nextQuery = permute(query)
psearch(user, nextQuery) flatMap { newResults =>
if (newResults.size > 0)
rsearch(user, nextQuery, results ++ newResults, n)
else
Future.value(results)
}
}
////////////////////////////////////////////////////////////////////////////////
type Service[Req, Rep] = Req => Future[Rep]
val client: Service[HttpReq, HttpRep] = Http.newService("twitter.com:80")
val f: Future[HttpRep] = client(HttpReq("/"))
// sample http echo server
Http.serve(":80", { req: HttpReq =>
Future.value(HttpRep(Status.OK, req.body))
})
// Primitive http proxy forwarding data from 8080 to twitter.com
Http.serve(":8080", Http.newService("twitter.com:80"))
////////////////////////////////////////////////////////////////////////////////
type Filter[Req, Rep] = (Req, Service[Req, Rep]) => Future[Rep]
// identity filter defers to the same service
val identityFilter = { (req, service) => service(req) }
// request timeout filter
def timeoutFilter(d: Duration) = { (req, service) => service(req).within(d) }
val httpClientWithTimeout: Service[HttpReq, HttpRep] =
timeoutFilter(10.seconds) andThen httpClient
def authReq(req: HttpReq): Future[AuthHttpReq]
val auth: (HttpReq, Service[AuthHttpReq, HttpRep]) =>
Future[HttpRep] = {
(req, service) => authReq(req) flatMap { authReq => service(authReq) }
}
val authedService: Service[AuthHttpReq, HttpRep] = ...
val service: Service[HttpReq, HttpRep] =
auth andThen authedService
////////////////////////////////////////////////////////////////////////////////
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment