Skip to content

Instantly share code, notes, and snippets.

@rpt
Last active June 9, 2016 10:18
Show Gist options
  • Save rpt/a74772ee0dd6ab470fcdcde0849d87a5 to your computer and use it in GitHub Desktop.
Save rpt/a74772ee0dd6ab470fcdcde0849d87a5 to your computer and use it in GitHub Desktop.
import akka.actor.{Actor, ReceiveTimeout, Stash}
import scala.concurrent.duration.Duration
class A extends Actor with Stash {
// calling with become/Stash
def receive = {
case m: Int =>
context.actorSelection("/user/echo") ! Echo.Request(m)
context.setReceiveTimeout(Duration(10, "seconds"))
context.become {
case Echo.Response(r) =>
unstashAll()
println(r)
context.setReceiveTimeout(Duration.Undefined)
context.unbecome
case ReceiveTimeout =>
context.setReceiveTimeout(Duration.Undefined)
throw new Exception("timeout")
case _ => stash // resets the timeout :(
}
}
}
import akka.actor.Actor
import akka.pattern.{AskSupport, AskTimeoutException}
import akka.util.Timeout
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.Duration
import scala.util.{Failure, Success}
class B extends Actor with AskSupport {
// calling with ask/Future
implicit val ec: ExecutionContext = context.dispatcher
implicit val t: Timeout = Timeout(Duration(30, "seconds"))
def receive = {
case m => {
val f = context.actorSelection("/user/echo") ? Echo.Request(m)
f.mapTo[Echo.Response].onComplete {
case Success(Echo.Response(r)) => println(r)
case Failure(_: AskTimeoutException) => throw new Exception("timeout")
case Failure(e) => throw e
}
}
}
}
import akka.actor.Actor
class C extends Actor {
// calling using the API function
def receive = {
case m => Echo.echo(m)(println(_))
}
}
import akka.actor.{Actor, ActorContext}
import akka.pattern.{AskSupport, AskTimeoutException}
import akka.util.Timeout
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.Duration
import scala.util.{Failure, Success}
class Echo extends Actor {
import Echo._
def receive = {
case Request(x) => sender ! Response(x)
}
}
object Echo extends AskSupport {
case class Request(x: Any)
case class Response(x: Any)
// API function with ask/Future
def echo(x: Any)(handle: (Any => Unit))(implicit context: ActorContext): Unit = {
implicit val ec: ExecutionContext = context.dispatcher
implicit val t: Timeout = Timeout(Duration(30, "seconds"))
val future = context.actorSelection("/user/echo") ? Request(x)
future.mapTo[Response].onComplete {
case Success(Response(r)) => handle(r)
case Failure(_: AskTimeoutException) => throw new Exception("timeout")
case Failure(e) => throw e
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment