Skip to content

Instantly share code, notes, and snippets.

@derekwyatt
Created June 23, 2011 17:27
Show Gist options
  • Save derekwyatt/1043056 to your computer and use it in GitHub Desktop.
Save derekwyatt/1043056 to your computer and use it in GitHub Desktop.
Shows a simple request/response wrapper for Akka Actors
object ReqRspActor {
object ReqRspProtocol {
case class UnknownMessage(msg: String) extends Exception(msg)
case class FutureException(exception: Throwable)
}
object HandledOrNot extends Enumeration {
type HandledOrNot = Value
val MessageHandled, MessageNotHandled = Value
}
object ResendOrNot extends Enumeration {
type ResendOrNot = Value
val Resend, DoNotResend = Value
}
def spawn(owner: ActorRef)(factory: => ReqRspActor): ActorRef = {
if (owner.faultHandler == NoFaultHandlingStrategy)
owner.faultHandler = OneForOneStrategy(List(classOf[Throwable]), 3, 1000)
val a = actorOf(factory)
owner.startLink(a)
a ! 'Go
a
}
}
abstract class ReqRspActor(recipient: ActorRef, message: Any) extends Actor {
import ReqRspActor._
import ReqRspActor.ResendOrNot._
import ReqRspActor.HandledOrNot._
self.lifeCycle = Permanent
def myref = self
type ResponseHandler = PartialFunction[Any, HandledOrNot]
def receiveResponse: ResponseHandler
case class ResponseReceived(msg: Any)
private def sendRequest(to: ActorRef, msg: Any): Unit = {
(to !!! msg) onComplete { (future) =>
future.exception match {
case Some(exception) =>
if (handleProblem(ReqRspProtocol.FutureException(exception)) == DoNotResend)
stop
else
sendRequest(recipient, message)
case _ =>
try {
self ! future.result.get
} catch {
// TODO: Should log something here, but the bottom line is that
// the actor to which this future "belongs" could, in fact,
// have left the building by now... This is a consequence of
// using futures
case _ =>
}
}
}
}
def processResponse: Receive = {
receiveResponse andThen {
case MessageNotHandled =>
case MessageHandled =>
self.supervisor.foreach(_ ! Unlink(self))
stop
}
}
def receive: Receive = {
case 'Go =>
sendRequest(recipient, message)
become(processResponse orElse handleUnknown)
}
def handleUnknown: Receive = {
case msg =>
if (handleProblem(ReqRspProtocol.UnknownMessage("Unrecognized message was received: (" + msg + ")")) == DoNotResend)
stop
else
sendRequest(recipient, message)
}
protected def stop = self ! PoisonPill
def handleProblem(message: Any): ResendOrNot = DoNotResend
}
import ReqRspActor._
// inside another actor - i.e. 'self' is valid
// sends a message to a database actor and gets the result, processes it and dies
val parentActor = self
spawn(parentActor)(new ReqRspActor(databaseActor, Retrieve("Something")) {
import ReqRspActor.HandledOrNot._
def receiveResponse: Receive = {
case RetrieveResults(results) =>
// process it here
MessageHandled
}
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment