Skip to content

Instantly share code, notes, and snippets.

@khajavi
Created August 5, 2017 10:24
Show Gist options
  • Save khajavi/f0b8b76e4df9f639687ad82a5bf628f9 to your computer and use it in GitHub Desktop.
Save khajavi/f0b8b76e4df9f639687ad82a5bf628f9 to your computer and use it in GitHub Desktop.
Three flavours of request-response pattern in Akka
// More info: http://www.nurkiewicz.com/2014/01/three-flavours-of-request-response.html
import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Cancellable, Props, Terminated}
import akka.event.LoggingReceive
import akka.util.Timeout
import akka.pattern.pipe
import scala.concurrent.duration._
import scala.language.postfixOps
object Main2 {
def main(args: Array[String]): Unit = {
val system = ActorSystem("Hello")
val a = system.actorOf(Props[HelloWorld], "helloWorld")
system.actorOf(Props(classOf[Terminator], a), "terminator")
}
class Terminator(ref: ActorRef) extends Actor with ActorLogging {
context watch ref
def receive = {
case Terminated(_) =>
log.info("{} has terminated, shutting down system", ref.path)
context.system.terminate()
}
}
}
import akka.actor.{ActorLogging, ActorRef}
import akka.pattern.ask
import scala.util.{Failure, Success}
case class CheckHealth()
case class Ping()
case class Pong()
case class Up()
case class Down()
class MonitoringActor1 extends Actor with ActorLogging {
private val networkActor = context.actorOf(Props[NetworkActor], "network")
private var origin: Option[ActorRef] = None
def receive = {
case CheckHealth =>
networkActor ! Ping
origin = Some(sender)
case Pong =>
origin.foreach(_ ! Up)
origin = None
}
}
class MonitoringActor2 extends Actor with ActorLogging {
private val networkActor = context.actorOf(Props[NetworkActor], "network")
def receive = waitingForCheckHealth
private def waitingForCheckHealth: Receive = {
case CheckHealth =>
networkActor ! Ping
context become waitingForPong(sender)
}
private def waitingForPong(origin: ActorRef): Receive = {
case Pong =>
origin ! Up
context become waitingForCheckHealth
}
}
class MonitoringActor3 extends Actor with ActorLogging {
private val networkActor = context.actorOf(Props[NetworkActor], "network")
def receive = waitingForCheckHealth
private def waitingForCheckHealth: Receive = {
case CheckHealth =>
networkActor ! Ping
implicit val ec = context.dispatcher
val timeout = context.system.scheduler.scheduleOnce(2.second, self, Down)
context become waitingForPong(sender, timeout)
}
private def waitingForPong(origin: ActorRef, timeout: Cancellable): Receive = LoggingReceive {
case Pong =>
timeout cancel()
origin ! Up
context become receive
case Down =>
origin ! Down
context become receive
}
}
class MonitoringActor4 extends Actor with ActorLogging {
private val networkActor = context.actorOf(Props[NetworkActor], "network")
def receive = {
case CheckHealth =>
implicit val timeout: Timeout = 1.second
implicit val ec = context.dispatcher
val origin = sender
networkActor ? Ping andThen {
case Success(_) => origin ! Up
case Failure(_) => origin ! Down
}
}
}
class MonitoringActor5 extends Actor with ActorLogging {
private val networkActor = context.actorOf(Props[NetworkActor], "network")
def receive = LoggingReceive {
case CheckHealth =>
implicit val ec = context.dispatcher
networkActor.ask(Ping)(4.second).
map { _ => Up }.
recover { case _ => Down }.
pipeTo(sender)
}
}
class NetworkActor extends Actor with ActorLogging {
override def receive: Receive = {
case Ping =>
Thread.sleep(3000)
sender() ! Pong
}
}
object Solution1 extends App {
import scala.concurrent.ExecutionContext.Implicits.global
implicit val system = ActorSystem()
private val monitoringActor = system.actorOf(Props[MonitoringActor1], "network")
implicit val timeout = Timeout(10 second)
(monitoringActor ? CheckHealth).onComplete {
case Success(v) => println(v)
case Failure(e) => println(e)
}
}
object Solution2 extends App {
import scala.concurrent.ExecutionContext.Implicits.global
implicit val system = ActorSystem()
private val monitoringActor = system.actorOf(Props[MonitoringActor2], "network")
implicit val timeout = Timeout(10 second)
(monitoringActor ? CheckHealth).onComplete {
case Success(v) => println(v)
case Failure(e) => println(e)
}
}
object Solution3 extends App {
import scala.concurrent.ExecutionContext.Implicits.global
implicit val system = ActorSystem()
private val monitoringActor = system.actorOf(Props[MonitoringActor3], "network")
implicit val timeout = Timeout(10 second)
(monitoringActor ? CheckHealth) onComplete {
case Success(v) => println(v)
case Failure(e) => println(e)
}
}
object Solution4 extends App {
import scala.concurrent.ExecutionContext.Implicits.global
implicit val system = ActorSystem()
private val monitoringActor = system.actorOf(Props[MonitoringActor4], "network")
implicit val timeout = Timeout(10 second)
(monitoringActor ? CheckHealth) onComplete {
case Success(v) => println(v)
case Failure(e) => println(e)
}
}
object Solution5 extends App {
import scala.concurrent.ExecutionContext.Implicits.global
implicit val system = ActorSystem()
private val monitoringActor = system.actorOf(Props[MonitoringActor5], "network")
implicit val timeout = Timeout(10 second)
(monitoringActor ? CheckHealth) onComplete {
case Success(v) => println(v)
case Failure(e) => println(e)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment