Last active
December 21, 2015 21:09
-
-
Save ryantanner/6366628 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Play controller | |
def source(flightIdent: String) = Action { | |
Async { | |
for { | |
flight <- Flight.findByIdent(flightIdent) | |
source <- (eventSource ? Track(flight)) | |
} yield source match { case Connected(stream) => | |
Ok.feed((stream &> EventSource[JsValue]()( | |
encoder = CometMessage.jsonMessages, | |
eventNameExtractor = pointNameExtractor, | |
eventIdExtractor = pointIdExtractor | |
))).as("text/event-stream") | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class EventSource extends Actor { | |
var connected = Map.empty[Flight, (Enumerator[JsValue], Concurrent.Channel[JsValue])] | |
var sources = List.empty[ActorRef] | |
def receive = LoggingReceive { | |
case Track(flight) => { | |
connected.get(flight) match { | |
case Some((e, c)) => sender ! Connected(e) | |
case None => { | |
val (e, c) = Concurrent.broadcast[JsValue] | |
val (broadcastingEnumerator, broadcaster) = | |
Concurrent.broadcast(e, (b) => { | |
log.info(s"interest to zero on ${flight.ident}") | |
self ! Stop(flight) | |
b.close() | |
}) | |
connected = connected + (flight -> (broadcastingEnumerator, c)) | |
sender ! Connected(broadcastingEnumerator) | |
} | |
} | |
} | |
case Channel(flight) => { | |
for ((_, channel) <- connected.get(flight)) { | |
sender ! Ready(flight, channel) | |
} | |
} | |
case Stop(flight) => { | |
log.info(s"stopping sources for ${flight.ident}") | |
for ((enumerator, channel) <- connected.get(flight)) { | |
channel.eofAndEnd | |
} | |
connected = connected - flight | |
sources foreach { s => s ! Stop(flight) } | |
} | |
case Stream(flight, value) => { | |
for ((_, channel) <- connected.get(flight)) { | |
channel.push(value) | |
} | |
} | |
case Register(actor) => { | |
log.info(s"Registering ${actor.path.toString}") | |
sources = sources :+ actor | |
} | |
} | |
} | |
case class Connected(enumerator: Enumerator[JsValue]) | |
case class Ready(flight: Flight, enumerator: Concurrent.Channel[JsValue]) | |
case class Track(flight: Flight) | |
case class Stop(flight: Flight) | |
case class Stream(flight: Flight, value: JsValue) | |
case class Channel(flight: Flight) | |
case class Register(actor: ActorRef) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment