Created
March 27, 2014 03:57
-
-
Save hisui/9799876 to your computer and use it in GitHub Desktop.
Play2.1+WebSocket
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package controllers | |
import play.api.mvc._ | |
import play.api.libs.concurrent.Execution.Implicits.defaultContext | |
import play.api.Play.current | |
import play.api.libs.functional.syntax._ | |
import play.api.libs.json._ | |
import play.api.libs.json.Json.toJson | |
import play.api.libs.iteratee.{Concurrent, Iteratee} | |
import play.api.libs.concurrent.Akka | |
import scala.concurrent.{Future, Promise} | |
import scala.concurrent.Future.{successful, failed} | |
import scala.reflect.ClassTag | |
import scala.collection.mutable | |
import akka.actor.{TypedProps, TypedActor} | |
object Application extends Controller with PacketHandler[Connection] { | |
def join = WebSocket.async[JsValue] { request => | |
val subj = Promise[Connection]() | |
successful { | |
Iteratee.foreach[JsValue] { e => | |
println("[debug] recv packet:" + e) | |
subj.future.foreach(conn => handle(conn, e).onSuccess { case o => | |
o.foreach(conn.send(_)) | |
}) | |
} | |
.mapDone(_ => subj.future.foreach(_.destroy())) -> Concurrent | |
.unicast[JsValue](chan => subj.completeWith(Connection(chan))) | |
} | |
} | |
on { conn => e:PingPong => | |
successful { | |
PingPong(e.text, System.currentTimeMillis()) | |
} | |
} | |
} | |
trait Connection { | |
def send[T:Writes](e:T) | |
def notify[T:Event](e:T) | |
def destroy() | |
} | |
object Connection { | |
def apply(chan:Concurrent.Channel[JsValue]) :Future[Connection] = successful { | |
TypedActor(Akka.system) | |
.typedActorOf(TypedProps(classOf[Connection], new ConnectionActor(chan))) | |
} | |
} | |
class ConnectionActor(chan:Concurrent.Channel[JsValue]) extends Connection { | |
def send[T:Writes](e:T) { | |
val js = toJson(e) | |
println("[debug] send packet: "+ js) | |
chan.push(js) | |
} | |
def notify[T:Event](e:T) { | |
send(implicitly[Event[T]].toJson(e)) | |
} | |
def destroy() { | |
} | |
} | |
abstract class Acceptor[Ctx, Arg:ClassTag, Ret:ClassTag] { | |
def accept(ctx:Ctx, js:JsValue):Future[JsValue] | |
} | |
class Bridge[Arg:ClassTag:Reads, Ret:ClassTag:Writes](val kind:String) { | |
def wrap[Ctx](f: Ctx => Arg => Future[Ret]):Acceptor[Ctx,Arg,Ret] = | |
new Acceptor[Ctx,Arg,Ret] { | |
def accept(ctx:Ctx, js:JsValue):Future[JsValue] = f(ctx)(js.as[Arg]).map(toJson(_)) | |
} | |
} | |
class Event[T:ClassTag:Writes](val kind:String) { | |
def toJson(e:T):JsValue = Json.toJson(new Packet(None, Json.toJson(e), kind)) | |
} | |
trait PacketHandler[Ctx] { | |
private[this] val handlers = mutable.Map[String,Acceptor[Ctx,_,_]]() | |
def on[Arg,Ret](f: Ctx => Arg => Future[Ret])(implicit ev:Bridge[Arg,Ret]) { | |
println("register handler: kind=" + ev.kind) | |
handlers.put(ev.kind, ev.wrap(f)) | |
} | |
def handle(ctx:Ctx, js:JsValue):Future[Option[Packet]] = | |
js.validate[Packet].map[Future[Option[Packet]]] { o => | |
val res = handlers.get(o.kind).map(_.accept(ctx, o.data)) | |
.getOrElse { | |
failed(new Exception("Unknown packet kind: '"+ o.kind +"'")) | |
} | |
o.tx.map { tx => | |
res.map(new RpcValue(tx, _)).recover { | |
case e:PacketException => new RpcError(o.tx.get, toJson(e.getMessage), e.code) | |
} | |
.map(Some(_)) | |
} getOrElse res.map(_ => None) | |
} | |
.recoverTotal(e => failed(new Exception(e.toString))) | |
} | |
class PacketException(message:String, val code:Int) extends Exception(message) | |
class Packet(val tx:Option[Int], val data:JsValue, val kind:String) | |
object Packet { | |
implicit val reads:Reads[Packet] = new Reads[Packet] { | |
def reads(js:JsValue) = | |
for { | |
txid <- (js \ "tx").validate[Option[Int]] | |
kind <- (js \ "kind").validate[String] | |
data <- (js \ "data").validate[JsValue] | |
} yield { | |
println("packet came: kind="+ kind) | |
txid.map(e => new RpcPacket(e, data, kind)).getOrElse(new EventPacket(data, kind)) | |
} | |
} | |
implicit val writes:Writes[Packet] = new Writes[Packet] { | |
def writes(o: Packet): JsValue = | |
JsObject(o.tx.map(e => ("tx", toJson(e))).toSeq ++ Seq(("data", toJson(o.data))) | |
++ (o match { | |
case o:RpcError => Seq(("code", toJson(o.code))) | |
case o:RpcValue => Seq[(String,JsValue)]() | |
case _ => | |
Seq(("kind", toJson(o.kind))) | |
})) | |
} | |
} | |
class RpcPacket(tx:Int, data:JsValue, kind:String) extends Packet(Some(tx), data, kind) | |
class RpcValue(tx:Int, data:JsValue) extends Packet(Some(tx), data, "") | |
class RpcError(tx:Int, data:JsValue | |
, val code:Int) extends RpcValue(tx, data) | |
class EventPacket(data:JsValue, kind:String) extends Packet(None, data, kind) | |
case class PingPong(text:String, time:Long) | |
object PingPong { | |
implicit def format:Format[PingPong] = ( | |
(__ \ "text").format[String] and | |
(__ \ "time").format[Long]) (apply, unlift(unapply)) | |
implicit val bridge = Bridge[PingPong,PingPong]("ping") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment