Created
June 23, 2011 17:27
-
-
Save derekwyatt/1043056 to your computer and use it in GitHub Desktop.
Shows a simple request/response wrapper for Akka Actors
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object ReqRspActor { | |
object ReqRspProtocol { | |
case class UnknownMessage(msg: String) extends Exception(msg) | |
case class FutureException(exception: Throwable) | |
} | |
object HandledOrNot extends Enumeration { | |
type HandledOrNot = Value | |
val MessageHandled, MessageNotHandled = Value | |
} | |
object ResendOrNot extends Enumeration { | |
type ResendOrNot = Value | |
val Resend, DoNotResend = Value | |
} | |
def spawn(owner: ActorRef)(factory: => ReqRspActor): ActorRef = { | |
if (owner.faultHandler == NoFaultHandlingStrategy) | |
owner.faultHandler = OneForOneStrategy(List(classOf[Throwable]), 3, 1000) | |
val a = actorOf(factory) | |
owner.startLink(a) | |
a ! 'Go | |
a | |
} | |
} | |
abstract class ReqRspActor(recipient: ActorRef, message: Any) extends Actor { | |
import ReqRspActor._ | |
import ReqRspActor.ResendOrNot._ | |
import ReqRspActor.HandledOrNot._ | |
self.lifeCycle = Permanent | |
def myref = self | |
type ResponseHandler = PartialFunction[Any, HandledOrNot] | |
def receiveResponse: ResponseHandler | |
case class ResponseReceived(msg: Any) | |
private def sendRequest(to: ActorRef, msg: Any): Unit = { | |
(to !!! msg) onComplete { (future) => | |
future.exception match { | |
case Some(exception) => | |
if (handleProblem(ReqRspProtocol.FutureException(exception)) == DoNotResend) | |
stop | |
else | |
sendRequest(recipient, message) | |
case _ => | |
try { | |
self ! future.result.get | |
} catch { | |
// TODO: Should log something here, but the bottom line is that | |
// the actor to which this future "belongs" could, in fact, | |
// have left the building by now... This is a consequence of | |
// using futures | |
case _ => | |
} | |
} | |
} | |
} | |
def processResponse: Receive = { | |
receiveResponse andThen { | |
case MessageNotHandled => | |
case MessageHandled => | |
self.supervisor.foreach(_ ! Unlink(self)) | |
stop | |
} | |
} | |
def receive: Receive = { | |
case 'Go => | |
sendRequest(recipient, message) | |
become(processResponse orElse handleUnknown) | |
} | |
def handleUnknown: Receive = { | |
case msg => | |
if (handleProblem(ReqRspProtocol.UnknownMessage("Unrecognized message was received: (" + msg + ")")) == DoNotResend) | |
stop | |
else | |
sendRequest(recipient, message) | |
} | |
protected def stop = self ! PoisonPill | |
def handleProblem(message: Any): ResendOrNot = DoNotResend | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ReqRspActor._ | |
// inside another actor - i.e. 'self' is valid | |
// sends a message to a database actor and gets the result, processes it and dies | |
val parentActor = self | |
spawn(parentActor)(new ReqRspActor(databaseActor, Retrieve("Something")) { | |
import ReqRspActor.HandledOrNot._ | |
def receiveResponse: Receive = { | |
case RetrieveResults(results) => | |
// process it here | |
MessageHandled | |
} | |
}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment