Skip to content

Instantly share code, notes, and snippets.

@dat-vikash
Created April 1, 2015 21:58
Show Gist options
  • Save dat-vikash/85294408faf98b4b213e to your computer and use it in GitHub Desktop.
Save dat-vikash/85294408faf98b4b213e to your computer and use it in GitHub Desktop.
/* This trait allows websocket classes to push messages to client via a concurrent channel */
trait WebSocketChannel
{
//instantiate an Enumerator and Channel
val (out,channel) = Concurrent.broadcast[JsValue]
// method to allow pushing of data up the channel
def push(data: JsValue) = channel.push(data)
def cleanSocketResources() = channel.end()
}
class WSClientVisitorSpec extends TestKit(_system = Akka.system(FakeApplication())) with WordSpecLike with Matchers with ImplicitSender
{
//instantiate test constants
val actorRef = TestActorRef(new WSClientVisitor("TEST") with MockWebSocketChannel, name= "test")
// get a test reference to our actor
val actor = actorRef.underlyingActor
"Web Socket Client For Visitor" should {
"register a new socket" in new WithApplication(app = FakeApplication(additionalConfiguration =Map("akka.event-handlers" -> List("akka.testkit.TestEventListener")),
withGlobal = Some(new GlobalSettings() {
override def onStart(app: api.Application) {
Logger.info("Creating Senate Immersion Module Bill Information")
}
}))) {
// clear our channel
actor.mockWebSocketChannelQueue = List.empty
// send the RegisterSocket message and verify the data on the chaneel
actor.receive(RegisterSocket)
actor.mockWebSocketChannelQueue(0) \ "type" should equal(JsString(“event”))
actor.mockWebSocketChannelQueue(0) \ “data” should equal(JsString(“connection_on”)
}
}
}
// define our events
case class RegisterSocket()
case class Connected(out: Enumerator[JsValue], ref: ActorRef)
class WSClientVisitor(name: String) extends Actor with ActorLogging with WebSocketChannel
{
def receive = {
case RegisterSocket => {
// close over sender
val myClient = sender
// establish connection
myClient ! Connected(out,self)
// send a test message
push(Json.obj("type" -> "event", “data” -> “connection_on”))
}
}
}
object Application extends Controller {
/* Each WebSocket connection state is managed by an Agent actor.
A new actor is created for each WebSocket, and is killed when the socket is closed.
For each play actor agent, a unique WebSocket Client Worker actor is created to process WS events via the WSManager Actor.
*/
def websocketManager(deviceId: String) = WebSocket.async[JsValue]
{
request =>
// instantiate an actor to hold web socket
val webSocketWorker = Akka.system.actorOf(Props(new WSClientVisitor(deviceId),name=deviceId)
//specify a timeout for the registration request
implicit val timeout = Timeout(Duration(2,"seconds"))
//register device
(webSocketWorker ? RegisterSocket).map {
//establish connection with wsClient Worker, which will process messages to our socket
case Connected(out, myWorker) => {
// acknowledge connection
val in = Iteratee.foreach[JsValue] {
event => myWorker ! Message(event)
}.mapDone(_ => myWorker ! Logger.info(s" Received Websocket FIN command from device: $deviceId"))
(in,out)
}
case NotConnected(out) => {
(Iteratee.ignore[JsValue],out)
}
}
}
}
trait MockWebSocketChannel extends WebSocketChannel
{
var mockWebSocketChannelQueue : List[JsValue] = List.empty
override def push(data: JsValue): Unit = mockWebSocketChannelQueue = mockWebSocketChannelQueue :+ data
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment