Skip to content

Instantly share code, notes, and snippets.

@ryantanner
Last active December 21, 2015 21:09
Show Gist options
  • Save ryantanner/6366628 to your computer and use it in GitHub Desktop.
Save ryantanner/6366628 to your computer and use it in GitHub Desktop.
// Play controller
def source(flightIdent: String) = Action {
Async {
for {
flight <- Flight.findByIdent(flightIdent)
source <- (eventSource ? Track(flight))
} yield source match { case Connected(stream) =>
Ok.feed((stream &> EventSource[JsValue]()(
encoder = CometMessage.jsonMessages,
eventNameExtractor = pointNameExtractor,
eventIdExtractor = pointIdExtractor
))).as("text/event-stream")
}
}
}
class EventSource extends Actor {
var connected = Map.empty[Flight, (Enumerator[JsValue], Concurrent.Channel[JsValue])]
var sources = List.empty[ActorRef]
def receive = LoggingReceive {
case Track(flight) => {
connected.get(flight) match {
case Some((e, c)) => sender ! Connected(e)
case None => {
val (e, c) = Concurrent.broadcast[JsValue]
val (broadcastingEnumerator, broadcaster) =
Concurrent.broadcast(e, (b) => {
log.info(s"interest to zero on ${flight.ident}")
self ! Stop(flight)
b.close()
})
connected = connected + (flight -> (broadcastingEnumerator, c))
sender ! Connected(broadcastingEnumerator)
}
}
}
case Channel(flight) => {
for ((_, channel) <- connected.get(flight)) {
sender ! Ready(flight, channel)
}
}
case Stop(flight) => {
log.info(s"stopping sources for ${flight.ident}")
for ((enumerator, channel) <- connected.get(flight)) {
channel.eofAndEnd
}
connected = connected - flight
sources foreach { s => s ! Stop(flight) }
}
case Stream(flight, value) => {
for ((_, channel) <- connected.get(flight)) {
channel.push(value)
}
}
case Register(actor) => {
log.info(s"Registering ${actor.path.toString}")
sources = sources :+ actor
}
}
}
case class Connected(enumerator: Enumerator[JsValue])
case class Ready(flight: Flight, enumerator: Concurrent.Channel[JsValue])
case class Track(flight: Flight)
case class Stop(flight: Flight)
case class Stream(flight: Flight, value: JsValue)
case class Channel(flight: Flight)
case class Register(actor: ActorRef)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment