Last active
August 2, 2017 02:41
-
-
Save olix0r/5818953 to your computer and use it in GitHub Desktop.
an example using Broker to publish results of something
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.twitter.concurrent.{Offer, Broker} | |
import com.twitter.finagle.{Filter, Service} | |
import com.twitter.finagle.http.{HttpMuxer, Request, Response, Status} | |
import com.twitter.server.TwitterServer | |
import com.twitter.util.{Await, Future} | |
object Harness extends TwitterServer { | |
val fruits = | |
flag[Seq[String]]("fruits", "apple" :: "banana" :: "cherry" :: "durian" :: Nil, "A list of fruits") | |
case class Fruit(flavor: String) | |
case class Jam(flavor: String) | |
object Jammer extends Service[Fruit, Jam] { | |
def apply(veg: Fruit) = | |
Future value Jam(veg.flavor) | |
} | |
object StringToFruit extends Filter[String, Jam, Fruit, Jam] { | |
def apply(flavor: String, jammer: Service[Fruit, Jam]) = | |
jammer(Fruit(flavor)) | |
} | |
object Shelf { | |
@volatile private[this] var jam: Option[Jam] = None | |
def put(j: Jam) { | |
jam = Some(j) | |
} | |
object Resource extends Service[Request, Response] { | |
def apply(request: Request): Future[Response] = { | |
jam match { | |
case Some(Jam(flavor)) => { | |
request.response.contentString = flavor + "\n" | |
} | |
case None => { | |
request.response.status = Status.NoContent | |
} | |
} | |
Future value request.response | |
} | |
} | |
} | |
object JamLoop { | |
private[this] def jammer = | |
StringToFruit andThen Jammer | |
// iterates through fruits again and again making jam out of them and sending them on the broker. | |
def apply(fruits: Seq[String]): Offer[Jam] = { | |
val broker = new Broker[Jam] | |
def loop(i: Int) { | |
jammer(fruits(i)) flatMap { jam => | |
broker.send(jam) sync() | |
} ensure loop((i + 1) % fruits.length) | |
} | |
loop(0) | |
broker.recv | |
} | |
} | |
val stopper = | |
new Broker[Unit] | |
def main() { | |
println("main") | |
try { | |
val jams = JamLoop(fruits()) | |
def run() { | |
Offer.select( | |
stopper.recv, | |
jams { jam => | |
Shelf.put(jam) | |
run() | |
}) | |
} | |
onExit { | |
println("stopping") | |
stopper.send().syncWait() | |
} | |
HttpMuxer.addRichHandler("/jam", Shelf.Resource) | |
println("running") | |
run() | |
println("waiting") | |
Await ready httpServer | |
} catch { case e => | |
println(e.getMessage) | |
} | |
} | |
} |
Author
olix0r
commented
Jun 19, 2013
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment