Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save sadache/2552064 to your computer and use it in GitHub Desktop.
Save sadache/2552064 to your computer and use it in GitHub Desktop.
Comparison of Comet, SSE and WebSocket server to client communication with Playframework 2.0 in Scala
/**
* Handles the comet event stream.
*/
def cometStream = Action {
AsyncResult {
implicit val timeout = Timeout(5.seconds)
val actor=Akka.system.actorOf(Props[EventListener])
// Actor is listening for event on the eventStream
Akka.system.eventStream.subscribe(actor,classOf[ChangeEvent])
// For each event, stream the data to client
(actor ? "start").mapTo[Enumerator[JsValue]].asPromise.map { chunks =>
Ok.stream((chunks) &> Comet( callback = "parent.onEvent"))
}
}
}
/**
* Handles the SSE event stream.
*/
def eventSourceStream = Action {
AsyncResult
{
implicit val timeout = Timeout(5.seconds)
val actor=Akka.system.actorOf(Props[EventListener])
// Actor is listening for event on the eventStream
Akka.system.eventStream.subscribe(actor,classOf[ChangeEvent])
// For each event, stream the data to client
(actor ? "start").mapTo[Enumerator[JsValue]].asPromise.map { chunks =>
Ok.feed(chunks &> EventSource()).as("text/event-stream")
}
}
}
/**
* Handles the websocket event stream.
*/
def listenEvents() = WebSocket.async[JsValue] { request =>
implicit val timeout = Timeout(5.seconds)
val actor=Akka.system.actorOf(Props[EventListener])
// Actor is listening for event on the eventStream
Akka.system.eventStream.subscribe(actor,classOf[ChangeEvent])
// For each event, stream the data to client
val iteratee = Iteratee.foreach[JsValue] {event => println(event)}
(actor ? "start").mapTo[Enumerator[JsValue]].asPromise.map {
chunks =>
(iteratee,chunks)
}
}
}
@xcarpentier
Copy link

Why do you used Akka to contruct your stream.

I had a SSE dev but it's fail on undetermined situation (ie. AWS micro, broadcast, ...).

Could you please give me some help ?

  val (broadcast, _) = Concurrent.broadcast ({
    Enumerator.generateM[JsValue](getUrl(URL_STATUS)) &> Enumeratee.mapM[JsValue] {
      t => Promise.timeout(t, Random.nextInt(30) + 30, TimeUnit.SECONDS)
    }
  })

  def stream = Action {
    Ok.feed(broadcast &> EventSource()).as("text/event-stream")
  }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment