Skip to content

Instantly share code, notes, and snippets.

@hisui
Created March 27, 2014 03:57
Show Gist options
  • Save hisui/9799876 to your computer and use it in GitHub Desktop.
Save hisui/9799876 to your computer and use it in GitHub Desktop.
Play2.1+WebSocket
package controllers
import play.api.mvc._
import play.api.libs.concurrent.Execution.Implicits.defaultContext
import play.api.Play.current
import play.api.libs.functional.syntax._
import play.api.libs.json._
import play.api.libs.json.Json.toJson
import play.api.libs.iteratee.{Concurrent, Iteratee}
import play.api.libs.concurrent.Akka
import scala.concurrent.{Future, Promise}
import scala.concurrent.Future.{successful, failed}
import scala.reflect.ClassTag
import scala.collection.mutable
import akka.actor.{TypedProps, TypedActor}
object Application extends Controller with PacketHandler[Connection] {
def join = WebSocket.async[JsValue] { request =>
val subj = Promise[Connection]()
successful {
Iteratee.foreach[JsValue] { e =>
println("[debug] recv packet:" + e)
subj.future.foreach(conn => handle(conn, e).onSuccess { case o =>
o.foreach(conn.send(_))
})
}
.mapDone(_ => subj.future.foreach(_.destroy())) -> Concurrent
.unicast[JsValue](chan => subj.completeWith(Connection(chan)))
}
}
on { conn => e:PingPong =>
successful {
PingPong(e.text, System.currentTimeMillis())
}
}
}
trait Connection {
def send[T:Writes](e:T)
def notify[T:Event](e:T)
def destroy()
}
object Connection {
def apply(chan:Concurrent.Channel[JsValue]) :Future[Connection] = successful {
TypedActor(Akka.system)
.typedActorOf(TypedProps(classOf[Connection], new ConnectionActor(chan)))
}
}
class ConnectionActor(chan:Concurrent.Channel[JsValue]) extends Connection {
def send[T:Writes](e:T) {
val js = toJson(e)
println("[debug] send packet: "+ js)
chan.push(js)
}
def notify[T:Event](e:T) {
send(implicitly[Event[T]].toJson(e))
}
def destroy() {
}
}
abstract class Acceptor[Ctx, Arg:ClassTag, Ret:ClassTag] {
def accept(ctx:Ctx, js:JsValue):Future[JsValue]
}
class Bridge[Arg:ClassTag:Reads, Ret:ClassTag:Writes](val kind:String) {
def wrap[Ctx](f: Ctx => Arg => Future[Ret]):Acceptor[Ctx,Arg,Ret] =
new Acceptor[Ctx,Arg,Ret] {
def accept(ctx:Ctx, js:JsValue):Future[JsValue] = f(ctx)(js.as[Arg]).map(toJson(_))
}
}
class Event[T:ClassTag:Writes](val kind:String) {
def toJson(e:T):JsValue = Json.toJson(new Packet(None, Json.toJson(e), kind))
}
trait PacketHandler[Ctx] {
private[this] val handlers = mutable.Map[String,Acceptor[Ctx,_,_]]()
def on[Arg,Ret](f: Ctx => Arg => Future[Ret])(implicit ev:Bridge[Arg,Ret]) {
println("register handler: kind=" + ev.kind)
handlers.put(ev.kind, ev.wrap(f))
}
def handle(ctx:Ctx, js:JsValue):Future[Option[Packet]] =
js.validate[Packet].map[Future[Option[Packet]]] { o =>
val res = handlers.get(o.kind).map(_.accept(ctx, o.data))
.getOrElse {
failed(new Exception("Unknown packet kind: '"+ o.kind +"'"))
}
o.tx.map { tx =>
res.map(new RpcValue(tx, _)).recover {
case e:PacketException => new RpcError(o.tx.get, toJson(e.getMessage), e.code)
}
.map(Some(_))
} getOrElse res.map(_ => None)
}
.recoverTotal(e => failed(new Exception(e.toString)))
}
class PacketException(message:String, val code:Int) extends Exception(message)
class Packet(val tx:Option[Int], val data:JsValue, val kind:String)
object Packet {
implicit val reads:Reads[Packet] = new Reads[Packet] {
def reads(js:JsValue) =
for {
txid <- (js \ "tx").validate[Option[Int]]
kind <- (js \ "kind").validate[String]
data <- (js \ "data").validate[JsValue]
} yield {
println("packet came: kind="+ kind)
txid.map(e => new RpcPacket(e, data, kind)).getOrElse(new EventPacket(data, kind))
}
}
implicit val writes:Writes[Packet] = new Writes[Packet] {
def writes(o: Packet): JsValue =
JsObject(o.tx.map(e => ("tx", toJson(e))).toSeq ++ Seq(("data", toJson(o.data)))
++ (o match {
case o:RpcError => Seq(("code", toJson(o.code)))
case o:RpcValue => Seq[(String,JsValue)]()
case _ =>
Seq(("kind", toJson(o.kind)))
}))
}
}
class RpcPacket(tx:Int, data:JsValue, kind:String) extends Packet(Some(tx), data, kind)
class RpcValue(tx:Int, data:JsValue) extends Packet(Some(tx), data, "")
class RpcError(tx:Int, data:JsValue
, val code:Int) extends RpcValue(tx, data)
class EventPacket(data:JsValue, kind:String) extends Packet(None, data, kind)
case class PingPong(text:String, time:Long)
object PingPong {
implicit def format:Format[PingPong] = (
(__ \ "text").format[String] and
(__ \ "time").format[Long]) (apply, unlift(unapply))
implicit val bridge = Bridge[PingPong,PingPong]("ping")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment