Skip to content

Instantly share code, notes, and snippets.

@ericwush
Created July 9, 2017 07:08
Show Gist options
  • Save ericwush/b4dec0cefb8e29a007206eef68df357a to your computer and use it in GitHub Desktop.
Save ericwush/b4dec0cefb8e29a007206eef68df357a to your computer and use it in GitHub Desktop.
import akka.actor.{Actor, ActorRef, Props}
import com.redis._
import Models.{NotificationEvent, RedisSubscribeMessage, RedisUnsubscribeMessage}
import play.api.Logger
import play.api.libs.json.{JsError, JsSuccess, Json}
import scala.util.Try
object RedisSubActor {
def props(redis: Redis): Props = {
val subscriberProps = Props(new Subscriber(redis.client))
Props(new RedisSubActor(subscriberProps))
}
}
class RedisSubActor(subscriberProps: Props) extends Actor {
val logger = Logger(this.getClass)
var webSocket: ActorRef = _
private val subscriber = context.actorOf(subscriberProps)
subscriber ! Register(callback)
def receive: Receive = {
case RedisSubscribeMessage(chs) => sub(chs)
case RedisUnsubscribeMessage(chs) => unsub(chs)
}
private def sub(channels: Seq[String]) = {
webSocket = sender()
subscriber ! Subscribe(channels.toArray)
}
private def unsub(channels: Seq[String]) = {
subscriber ! Unsubscribe(channels.toArray)
}
private def callback(pubsub: PubSubMessage) = pubsub match {
case E(exception) => logger.error("Fatal error caused consumer dead.", exception)
case S(channel, no) => logger.debug(s"Subscribed to $channel and count = $no")
case U(channel, no) => logger.debug(s"Unsubscribed from $channel and count = $no")
case M(channel, msg) =>
Try(Json.parse(msg).validate[NotificationEvent]).map {
case success: JsSuccess[NotificationEvent] =>
val notificationEvent = success.get
webSocket ! notificationEvent
logger.debug(s"Pushed event to user id $channel, event: $notificationEvent")
case e: JsError => logger.error(s"Unrecognized event for user id $channel: ${JsError.toJson(e)}")
} recover {
case exception => logger.error(s"Unrecognized event for user id $channel: $exception")
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment