Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
class SubscriberActor(statsActor: ActorRef) extends ActorSubscriber {
override def receive = {
case OnNext(message: Message) =>
for {
eventType <- message.headers.get("type") if eventType == "DiceRolled"
content <- parseOpt(message.body.decodeString("UTF-8"))
JInt(rolledNumber) <- content \ "rolledNumber"
} statsActor ! StatsActor.IncRollsCount(rolledNumber.toInt)
case OnComplete =>
log.info("Game stream completed, shutting down system")
system.shutdown()
case OnError(cause) =>
log.error(cause, "Publisher error occurred, shutting down system")
system.shutdown()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment