-
-
Save antoniy/1db41c6b0bb411bdcbe0 to your computer and use it in GitHub Desktop.
Simple Ask pattern for Akka.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.{Actor, ActorContext, ActorRef} | |
import akka.event.LoggingReceive | |
import scala.collection.mutable | |
import scala.concurrent.duration.FiniteDuration | |
import scala.concurrent._ | |
/** | |
* Implements simple ask pattern that works in single JVM setup to conveniently use futures as a response without the | |
* additional actor creation penalty. | |
* | |
* User: Antoniy Chonkov <antoniy@gmail.com> | |
* Date: 10/3/13 | |
* Time: 11:12 PM | |
*/ | |
object SimpleAskPattern { | |
/** | |
* Message wrapper that is used with the underlying 'tell' operation. It holds the sender's promise for the result | |
* as well as the actual message payload. | |
* | |
* @param senderPromise Promise[Any] instance for the response. | |
* @param payload The actual message that has been send to the target actor. | |
*/ | |
case class AskMessageWrapper(senderPromise: Promise[Any], payload: Any) | |
/** | |
* Implicit conversion that adds '??' method to the ActorRef instances. | |
* | |
* @param ref The ActorRef instance we'll wrap around. | |
* @tparam T The actual type mey be any type that is a subtype of ActorRef. | |
*/ | |
implicit class AskableActorRef[T <: ActorRef](ref: T) { | |
/** | |
* Ask operation. It sends a message to an actor via 'tell' by wrapping the actual message and promise for a result. | |
* The method produce Future[Any] with the result. The timeout must be provided and if the promise has not been | |
* completed until the timeout ends, it will be completed (failed) with timeout exception. | |
* | |
* @param message The actual message we want to send. | |
* @param context An ActorContext instance to access scheduler. | |
* @param timeout FiniteDuration instance - the timeout for the future. | |
* @param executionContext Execution context, so we can schedule a timeout. | |
* @param sender The actual sender of the message - implicitly 'self' and if non is provided - 'deadLetters' is the | |
* default. | |
* @return Future[Any] that holds the response of the 'ask' operation. | |
*/ | |
def ??(message: Any)(implicit context: ActorContext, timeout: FiniteDuration, executionContext: ExecutionContext, | |
sender: ActorRef = ActorRef.noSender): Future[Any] = { | |
if (timeout.length <= 0) | |
Future.failed[Any](new IllegalArgumentException(s"Timeout length must not be negative, question not sent to [$ref]")) | |
val senderPromise = promise[Any]() | |
context.system.scheduler.scheduleOnce(timeout)(senderPromise.tryFailure(new TimeoutException("Ask operation timed out"))) | |
ref.tell(AskMessageWrapper(senderPromise, message), sender) | |
senderPromise.future | |
} | |
} | |
/** | |
* AskableActor trait provides functions to handle simple ask requests. | |
* | |
* It defines a way to reply to a sender with a result via '!!' operator and manages promises and mapping between | |
* sender and his promise. The trait also defines AskableReceive partial function that should be used in the receive | |
* actor method because it provides the '!!' operator and handles messages that are wrapped in AskMessageWrapper | |
* instances. | |
*/ | |
trait AskableActor extends Actor { | |
/** | |
* MessageIdentifier identifies uniquely any ask message that is received in the actor. It is used to map ask | |
* messages to senders' promises. | |
* | |
* @param id The actual message ID. | |
*/ | |
case class MessageIdentifier(id: BigInt) | |
/** Variable that indicates if the currently processed message is ask message. */ | |
private[this] var isCurrentProcessingAsk: Boolean = _ | |
/** | |
* Check if the currently processed message is ask message. | |
* | |
* @return True if the currently processed message is ask, false - otherwise. | |
*/ | |
def isAsk: Boolean = isCurrentProcessingAsk | |
/** Variable that stores current message ID value. It's used to generate message IDs. */ | |
private[this] var currentMessageId: BigInt = 0 | |
/** | |
* Generates next message ID. | |
* | |
* @return The generated message ID. | |
*/ | |
private[this] def nextMessageId: BigInt = { currentMessageId += 1; currentMessageId } | |
/** | |
* Method that define implicit in scope with the current message identifier. | |
* | |
* @return The current message identifier. | |
*/ | |
implicit def messageIdentifier: MessageIdentifier = MessageIdentifier(currentMessageId) | |
/** | |
* A map that stores all promise instances for each message ID. This way we know which promise to complete for | |
* each message. | |
*/ | |
private[this] val senderPromises = mutable.Map[BigInt, Promise[Any]]() | |
/** | |
* Implicit conversion that adds '!!' method to all ActorRef subtype instances. | |
* | |
* @param ref The ActorRef we're wrapping. | |
* @tparam T Type that indicates we're working with all ActorRef subtypes. | |
*/ | |
implicit class ActorRefBang[T <: ActorRef](ref: T) { | |
/** | |
* The '!!' method does two things: | |
* | |
* 1) It acts like normal '!' (bang) when the currently processing message is not ask message. | |
* 2) If the currently processing message is ask message the method get the promise for the ask message and | |
* completes it with the value (message the user try to send via '!!' method). | |
* | |
* @param message The message we're gona use to complete the promise or to send to the target ActorRef via 'tell'. | |
* @param messageIdentifier MessageIdentifier instance that indicates which ask message we're using. If none - the | |
* default value is used. | |
* @param sender The sender of the eventual 'tell' message. If none provided the default sender is | |
* used - 'deadLetters'. | |
* @return Unit | |
*/ | |
def !!(message: Any)(implicit messageIdentifier: MessageIdentifier = MessageIdentifier(0), | |
sender: ActorRef = Actor.noSender): Unit = | |
senderPromises.get(messageIdentifier.id) match { | |
case None => ref.tell(message, sender) | |
case Some(target) => { | |
senderPromises -= messageIdentifier.id | |
target.trySuccess(message) | |
} | |
} | |
} | |
/** | |
* AskableReceive partial function is used to wrap the Actor's receive partial function so it can intercept and | |
* process 'ask' messages. | |
* | |
* @param r A new partial function that wraps the user defined one and is used by Actor's receive method. | |
*/ | |
class AskableReceive(r: Actor.Receive) extends Actor.Receive { | |
/** | |
* When the AskableReceive partial function is invoked with a message it checks if the message is 'ask' message. | |
* If it is 'ask' message it's cached in a map with generated message ID and the actual message payload is passed | |
* to the user defined receive partial function. If the message is not 'ask' then the method basically just | |
* forwards the invocation to the user's receive partial function. | |
* | |
* @param x The message that the current AskableReceive partial function is invoked with. | |
*/ | |
def apply(x: Any): Unit = x match { | |
case AskMessageWrapper(senderPromise, payload) => { | |
isCurrentProcessingAsk = true | |
val messageId = nextMessageId | |
senderPromises += messageId -> senderPromise | |
try r(payload) | |
catch { | |
case t: Throwable => { | |
senderPromises -= messageId | |
senderPromise.tryFailure(t) | |
throw t | |
} | |
} | |
} | |
case msg => { | |
isCurrentProcessingAsk = false | |
r(msg) | |
} | |
} | |
/** | |
* Check if the partial function is defined for the provided message. | |
* | |
* If the message is ask, we extract the payload and check if user's partial function is defined for the original | |
* message payload. If the message is not 'ask' then we directly check if user's partial function is defined for | |
* this message. | |
* | |
* @param x The message that has to be check against the partial function. | |
* @return True if the partial function is defined for this message, false - otherwise. | |
*/ | |
def isDefinedAt(x: Any): Boolean = x match { | |
case AskMessageWrapper(_, payload) => r.isDefinedAt(payload) | |
case msg => r.isDefinedAt(x) | |
} | |
} | |
/** AskableReceive companion object - defines two factory apply methods. */ | |
object AskableReceive { | |
/** Create new AskableReceive instance. */ | |
def apply(r: Actor.Receive): Actor.Receive = new AskableReceive(r) | |
/** Create new AskableReceive instance with the ability to enable receive logging. */ | |
def apply(withLogging: Boolean = false)(r: Actor.Receive): Actor.Receive = | |
if(withLogging) new AskableReceive(LoggingReceive(r)) | |
else new AskableReceive(r) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment