Skip to content

Instantly share code, notes, and snippets.

@josdirksen
Last active April 27, 2016 02:10
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save josdirksen/9e0dbdc59f82030086fa to your computer and use it in GitHub Desktop.
Save josdirksen/9e0dbdc59f82030086fa to your computer and use it in GitHub Desktop.
name := "akka-http-websockets"
version := "1.0"
scalaVersion := "2.11.6"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-stream-experimental" % "1.0-RC2",
"com.typesafe.akka" %% "akka-http-core-experimental" % "1.0-RC2",
"com.typesafe.play" %% "play-json" % "2.3.4",
"org.akkamon" %% "akka-mon" % "1.0-SNAPSHOT",
"org.java-websocket" % "Java-WebSocket" % "1.3.0")
resolvers += "Akka Snapshot Repository" at "http://repo.akka.io/snapshots/"
import java.io.File
import akka.actor._
import akka.routing.{Routee, RemoveRoutee, ActorRefRoutee, AddRoutee}
import akka.stream.actor.ActorPublisher
import org.akkamon.core.ActorStack
import org.akkamon.core.instruments.CounterTrait
import play.api.libs.json.Json
import scala.annotation.tailrec
import scala.concurrent.duration._
/**
* for now a very simple actor, which keeps a separate buffer
* for each subscriber. This could be rewritten to store the
* vmstats in an actor somewhere centrally and pull them from there.
*
* Based on the standed publisher example from the akka docs.
*/
class VMStatsPublisher(router: ActorRef, id: Option[String]) extends ActorPublisher[String] with ActorStack {
case class QueueUpdated()
import akka.stream.actor.ActorPublisherMessage._
import scala.collection.mutable
val MaxBufferSize = 50
val queue = mutable.Queue[String]()
var queueUpdated = false;
// on startup, register with routee
override def preStart() {
println(s"Adding to router: $self")
router ! AddRoutee(ActorRefRoutee(self))
}
// cleanly remove this actor from the router. If
// we don't do this the router will also stop since
// we didn't specify a different supervisorstrategy
override def postStop(): Unit = {
println(s"Removing from router: $self")
router ! RemoveRoutee(ActorRefRoutee(self))
}
def receive = {
// receive new stats, add them to the queue, and quickly
// exit.
case stats: String =>
// remove the oldest one from the queue and add a new one
if (queue.size == MaxBufferSize) queue.dequeue()
queue += stats
if (!queueUpdated) {
queueUpdated = true
self ! QueueUpdated
}
// we receive this message if there are new items in the
// queue. If we have a demand for messages send the requested
// demand.
case QueueUpdated => deliver()
// the connected subscriber request n messages, we don't need
// to explicitely check the amount, we use totalDemand propery for this
case Request(amount) =>
deliver()
// subscriber stops, so we stop ourselves.
case Cancel =>
context.stop(self)
}
/**
* Deliver the message to the subscriber. In the case of websockets over TCP, note
* that even if we have a slow consumer, we won't notice that immediately. First the
* buffers will fill up before we get feedback.
*/
@tailrec final def deliver(): Unit = {
if (totalDemand == 0) {
id match {
case Some(i) => println(s"No more demand for $i")
case _ => println(s"No more demand for: $this")
}
}
if (queue.size == 0 && totalDemand != 0) {
// we can response to queueupdated msgs again, since
// we can't do anything until our queue contains stuff again.
queueUpdated = false
} else if (totalDemand > 0 && queue.size > 0) {
// also send a message to the counter
exporter.processCounter(s"count.invocation-actorpublisher-${(id.get)}")
onNext(queue.dequeue())
deliver()
}
}
}
/**
* Just a simple router, which collects some VM stats and sends them to the provided
* actorRef each interval.
*/
class VMActor(router: ActorRef, delay: FiniteDuration, interval: FiniteDuration) extends Actor {
import scala.concurrent.ExecutionContext.Implicits.global
context.system.scheduler.schedule(delay, interval) {
val json = Json.obj( "stats" -> getStats.map(el => el._1 -> el._2))
router ! Json.prettyPrint(json)
}
override def receive: Actor.Receive = {
case _ => // just ignore any messages
}
def getStats: Map[String, Long] = {
val baseStats = Map[String, Long](
"count.procs" -> Runtime.getRuntime.availableProcessors(),
"count.mem.free" -> Runtime.getRuntime.freeMemory(),
"count.mem.maxMemory" -> Runtime.getRuntime.maxMemory(),
"count.mem.totalMemory" -> Runtime.getRuntime.totalMemory()
)
val roots = File.listRoots()
val totalSpaceMap = roots.map(root => s"count.fs.total.${root.getAbsolutePath}" -> root.getTotalSpace) toMap
val freeSpaceMap = roots.map(root => s"count.fs.free.${root.getAbsolutePath}" -> root.getFreeSpace) toMap
val usuableSpaceMap = roots.map(root => s"count.fs.usuable.${root.getAbsolutePath}" -> root.getUsableSpace) toMap
baseStats ++ totalSpaceMap ++ freeSpaceMap ++ usuableSpaceMap
}
}
/**
* Simple router where we can add and remove routee. This actor is not
* immutable.
*/
class RouterActor extends Actor with CounterTrait {
var routees = Set[Routee]()
// to make stats more clear
actorName = "VMActor"
def receive = {
case ar: AddRoutee => routees = routees + ar.routee
case rr: RemoveRoutee => routees = routees - rr.routee
case msg => routees.foreach(_.send(msg, sender))
}
}
import java.net.URI
import akka.actor.Actor.Receive
import org.akkamon.core.ActorStack
import org.akkamon.core.exporters.StatsdExporter
import org.java_websocket.client.WebSocketClient
import org.java_websocket.drafts.{Draft_17}
import org.java_websocket.handshake.ServerHandshake
/**
* A very simple websocket client, which we'll use to simulate a slow client to show backpressure
* in action with websockets.
*/
object WSClient extends App {
val NumberOfClients = 10;
val RandomRange = 100;
val Base = 50;
// create and connect the client
1 to NumberOfClients foreach({ cnt =>
val client = new Client(cnt, Math.round(Math.random() * RandomRange + Base))
Thread.sleep(10);
client.connect();
}
)
// Implement specific callbacks
class Client(id: Int, delay: Long) extends WebSocketClient(new URI(s"ws://localhost:9001/stats?id=$id"), new Draft_17) {
var count = 0
val exporter = StatsdExporter
override def onMessage(message: String): Unit = {
Thread.sleep(delay);
exporter.processCounter(s"count.invocation-websocketclient-$id")
count+=1
if (count % 100 == 0) println(f"$id%2d:onmessage:$count%5d")
}
override def onClose(code: Int, reason: String, remote: Boolean): Unit = println("Websocket closed")
override def onOpen(handshakedata: ServerHandshake): Unit = println(s"Websocket openend: delay = $delay")
override def onError(ex: Exception): Unit = println("Websocket error" + ex);
}
}
import java.net.{SocketOptions, Inet4Address, InetAddress, Socket}
import java.util.concurrent.TimeoutException
import akka.actor.{ActorRef, Props, ActorSystem}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpMethods._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._
import akka.io.Inet
import akka.io.Inet.SO
import akka.routing.BroadcastGroup
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl._
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.Random
/**
* Extractor to detect websocket messages. This checks whether the header
* is available, and whether it contains an actual upgrade message.
*/
object WSRequest {
def unapply(req: HttpRequest) : Option[HttpRequest] = {
if (req.header[UpgradeToWebsocket].isDefined) {
req.header[UpgradeToWebsocket] match {
case Some(upgrade) => Some(req)
case None => None
}
} else None
}
}
/**
* Simple websocket server using akka-http and akka-streams.
*
* Note that about 600 messages get queued up in the send buffer (on mac, 146988 is default socket buffer)
*/
object WSServer extends App {
// required actorsystem and flow materializer
implicit val system = ActorSystem("websockets")
implicit val fm = ActorFlowMaterializer()
// setup the actors for the stats
// router: will keep a list of connected actorpublisher, to inform them about new stats.
// vmactor: will start sending messages to the router, which will pass them on to any
// connected routee
val router: ActorRef = system.actorOf(Props[RouterActor], "router")
val vmactor: ActorRef = system.actorOf(Props(classOf[VMActor], router ,2 seconds, 25 milliseconds))
// Bind to an HTTP port and handle incoming messages.
// With the custom extractor we're always certain the header contains
// the correct upgrade message.
// We can pass in a socketoptions to tune the buffer behavior
// e.g options = List(Inet.SO.SendBufferSize(100))
val binding = Http().bindAndHandleSync({
case WSRequest(req@HttpRequest(GET, Uri.Path("/simple"), _, _, _)) => handleWith(req, Flows.reverseFlow)
case WSRequest(req@HttpRequest(GET, Uri.Path("/echo"), _, _, _)) => handleWith(req, Flows.echoFlow)
case WSRequest(req@HttpRequest(GET, Uri.Path("/graph"), _, _, _)) => handleWith(req, Flows.graphFlow)
case WSRequest(req@HttpRequest(GET, Uri.Path("/graphWithSource"), _, _, _)) => handleWith(req, Flows.graphFlowWithExtraSource)
case WSRequest(req@HttpRequest(GET, Uri.Path("/stats"), _, _, _)) => handleWith(req, Flows.graphFlowWithStats(router, req.getUri().parameter("id")))
case _: HttpRequest => HttpResponse(400, entity = "Invalid websocket request")
}, interface = "localhost", port = 9001)
// binding is a future, we assume it's ready within a second or timeout
try {
Await.result(binding, 1 second)
println("Server online at http://localhost:9001")
} catch {
case exc: TimeoutException =>
println("Server took to long to startup, shutting down")
system.shutdown()
}
/**
* Simple helper function, that connects a flow to a specific websocket upgrade request
*/
def handleWith(req: HttpRequest, flow: Flow[Message, Message, Unit]) = req.header[UpgradeToWebsocket].get.handleMessages(flow)
}
/**
* This object contains the flows the handle the websockets messages. Each flow is attached
* to a websocket and gets executed whenever a message comes in from the client.
*/
object Flows {
/**
* The simple flow just reverses the sent message and returns it to the client. There
* are two types of messages, streams and normal textmessages. We only process the
* normal ones here, and ignore the others.
*/
def reverseFlow: Flow[Message, Message, Unit] = {
Flow[Message].map {
case TextMessage.Strict(txt) => TextMessage.Strict(txt.reverse)
case _ => TextMessage.Strict("Not supported message type")
}
}
/**
* Simple flow which just returns the original message
* back to the client
*/
def echoFlow: Flow[Message, Message, Unit] = Flow[Message]
/**
* Flow which uses a graph to process the incoming message.
*
* compute
* collect ~> broadcast ~> compute ~> zip ~> map
* compute
*
* We broadcast the message to three map functions, we
* then zip them all up, and map them to the response
* message which we return.
*
* @return
*/
def graphFlow: Flow[Message, Message, Unit] = {
Flow() { implicit b =>
import FlowGraph.Implicits._
val collect = b.add(Flow[Message].collect[String]({
case TextMessage.Strict(txt) => txt
}))
// setup the components of the flow
val compute1 = b.add(Flow[String].map(_ + ":1"))
val compute2 = b.add(Flow[String].map(_ + ":2"))
val compute3 = b.add(Flow[String].map(_ + ":3"))
val broadcast = b.add(Broadcast[String](3))
val zip = b.add(ZipWith[String,String,String,String]((s1, s2, s3) => s1 + s2 + s3))
val mapToMessage = b.add(Flow[String].map[TextMessage](TextMessage.Strict))
// now we build up the flow
broadcast ~> compute1 ~> zip.in0
collect ~> broadcast ~> compute2 ~> zip.in1
broadcast ~> compute3 ~> zip.in2
zip.out ~> mapToMessage
(collect.inlet, mapToMessage.outlet)
}
}
/**
* When the flow is materialized we don't really just have to respond with a single
* message. Any message that is produced from the flow gets sent to the client. This
* means we can also attach an additional source to the flow and use that to push
* messages to the client.
*
* So this flow looks like this:
*
* in ~> filter ~> merge
* newSource ~> merge ~> map
* This flow filters out the incoming messages, and the merge will only see messages
* from our new flow. All these messages get sent to the connected websocket.
*
*
* @return
*/
def graphFlowWithExtraSource: Flow[Message, Message, Unit] = {
Flow() { implicit b =>
import FlowGraph.Implicits._
// Graph elements we'll use
val merge = b.add(Merge[Int](2))
val filter = b.add(Flow[Int].filter(_ => false))
// convert to int so we can connect to merge
val mapMsgToInt = b.add(Flow[Message].map[Int] { msg => -1 })
val mapIntToMsg = b.add(Flow[Int].map[Message]( x => TextMessage.Strict(":" + randomPrintableString(200) + ":" + x.toString)))
val log = b.add(Flow[Int].map[Int](x => {println(x); x}))
// source we want to use to send message to the connected websocket sink
val rangeSource = b.add(Source(1 to 2000))
// connect the graph
mapMsgToInt ~> filter ~> merge // this part of the merge will never provide msgs
rangeSource ~> log ~> merge ~> mapIntToMsg
// expose ports
(mapMsgToInt.inlet, mapIntToMsg.outlet)
}
}
/**
* Creates a flow which uses the provided source as additional input. This complete scenario
* works like this:
* 1. When the actor is created it registers itself with a router.
* 2. the VMActor sends messages at an interval to the router.
* 3. The router next sends the message to this source which injects it into the flow
*/
def graphFlowWithStats(router: ActorRef, id: Option[String]): Flow[Message, Message, Unit] = {
Flow() { implicit b =>
import FlowGraph.Implicits._
id match {
case Some(i) => println(s"Connection received for stats from id: $i")
case _ => println(s"Connection received for stats no id")
}
// create an actor source
val source = Source.actorPublisher[String](Props(classOf[VMStatsPublisher],router, id))
// Graph elements we'll use
val merge = b.add(Merge[String](2))
val filter = b.add(Flow[String].filter(_ => false))
// convert to int so we can connect to merge
val mapMsgToString = b.add(Flow[Message].map[String] { msg => "" })
val mapStringToMsg = b.add(Flow[String].map[Message]( x => TextMessage.Strict(x)))
val statsSource = b.add(source)
// connect the graph
mapMsgToString ~> filter ~> merge // this part of the merge will never provide msgs
statsSource ~> merge ~> mapStringToMsg
// expose ports
(mapMsgToString.inlet, mapStringToMsg.outlet)
}
}
def randomPrintableString(length: Int, start:String = ""): String = {
if (length == 0) start else randomPrintableString(length -1, start + Random.nextPrintableChar())
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment