Skip to content

Instantly share code, notes, and snippets.

@antoniy
Last active December 24, 2015 14:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save antoniy/1db41c6b0bb411bdcbe0 to your computer and use it in GitHub Desktop.
Save antoniy/1db41c6b0bb411bdcbe0 to your computer and use it in GitHub Desktop.
Simple Ask pattern for Akka.
import akka.actor.{Actor, ActorContext, ActorRef}
import akka.event.LoggingReceive
import scala.collection.mutable
import scala.concurrent.duration.FiniteDuration
import scala.concurrent._
/**
* Implements simple ask pattern that works in single JVM setup to conveniently use futures as a response without the
* additional actor creation penalty.
*
* User: Antoniy Chonkov <antoniy@gmail.com>
* Date: 10/3/13
* Time: 11:12 PM
*/
object SimpleAskPattern {
/**
* Message wrapper that is used with the underlying 'tell' operation. It holds the sender's promise for the result
* as well as the actual message payload.
*
* @param senderPromise Promise[Any] instance for the response.
* @param payload The actual message that has been send to the target actor.
*/
case class AskMessageWrapper(senderPromise: Promise[Any], payload: Any)
/**
* Implicit conversion that adds '??' method to the ActorRef instances.
*
* @param ref The ActorRef instance we'll wrap around.
* @tparam T The actual type mey be any type that is a subtype of ActorRef.
*/
implicit class AskableActorRef[T <: ActorRef](ref: T) {
/**
* Ask operation. It sends a message to an actor via 'tell' by wrapping the actual message and promise for a result.
* The method produce Future[Any] with the result. The timeout must be provided and if the promise has not been
* completed until the timeout ends, it will be completed (failed) with timeout exception.
*
* @param message The actual message we want to send.
* @param context An ActorContext instance to access scheduler.
* @param timeout FiniteDuration instance - the timeout for the future.
* @param executionContext Execution context, so we can schedule a timeout.
* @param sender The actual sender of the message - implicitly 'self' and if non is provided - 'deadLetters' is the
* default.
* @return Future[Any] that holds the response of the 'ask' operation.
*/
def ??(message: Any)(implicit context: ActorContext, timeout: FiniteDuration, executionContext: ExecutionContext,
sender: ActorRef = ActorRef.noSender): Future[Any] = {
if (timeout.length <= 0)
Future.failed[Any](new IllegalArgumentException(s"Timeout length must not be negative, question not sent to [$ref]"))
val senderPromise = promise[Any]()
context.system.scheduler.scheduleOnce(timeout)(senderPromise.tryFailure(new TimeoutException("Ask operation timed out")))
ref.tell(AskMessageWrapper(senderPromise, message), sender)
senderPromise.future
}
}
/**
* AskableActor trait provides functions to handle simple ask requests.
*
* It defines a way to reply to a sender with a result via '!!' operator and manages promises and mapping between
* sender and his promise. The trait also defines AskableReceive partial function that should be used in the receive
* actor method because it provides the '!!' operator and handles messages that are wrapped in AskMessageWrapper
* instances.
*/
trait AskableActor extends Actor {
/**
* MessageIdentifier identifies uniquely any ask message that is received in the actor. It is used to map ask
* messages to senders' promises.
*
* @param id The actual message ID.
*/
case class MessageIdentifier(id: BigInt)
/** Variable that indicates if the currently processed message is ask message. */
private[this] var isCurrentProcessingAsk: Boolean = _
/**
* Check if the currently processed message is ask message.
*
* @return True if the currently processed message is ask, false - otherwise.
*/
def isAsk: Boolean = isCurrentProcessingAsk
/** Variable that stores current message ID value. It's used to generate message IDs. */
private[this] var currentMessageId: BigInt = 0
/**
* Generates next message ID.
*
* @return The generated message ID.
*/
private[this] def nextMessageId: BigInt = { currentMessageId += 1; currentMessageId }
/**
* Method that define implicit in scope with the current message identifier.
*
* @return The current message identifier.
*/
implicit def messageIdentifier: MessageIdentifier = MessageIdentifier(currentMessageId)
/**
* A map that stores all promise instances for each message ID. This way we know which promise to complete for
* each message.
*/
private[this] val senderPromises = mutable.Map[BigInt, Promise[Any]]()
/**
* Implicit conversion that adds '!!' method to all ActorRef subtype instances.
*
* @param ref The ActorRef we're wrapping.
* @tparam T Type that indicates we're working with all ActorRef subtypes.
*/
implicit class ActorRefBang[T <: ActorRef](ref: T) {
/**
* The '!!' method does two things:
*
* 1) It acts like normal '!' (bang) when the currently processing message is not ask message.
* 2) If the currently processing message is ask message the method get the promise for the ask message and
* completes it with the value (message the user try to send via '!!' method).
*
* @param message The message we're gona use to complete the promise or to send to the target ActorRef via 'tell'.
* @param messageIdentifier MessageIdentifier instance that indicates which ask message we're using. If none - the
* default value is used.
* @param sender The sender of the eventual 'tell' message. If none provided the default sender is
* used - 'deadLetters'.
* @return Unit
*/
def !!(message: Any)(implicit messageIdentifier: MessageIdentifier = MessageIdentifier(0),
sender: ActorRef = Actor.noSender): Unit =
senderPromises.get(messageIdentifier.id) match {
case None => ref.tell(message, sender)
case Some(target) => {
senderPromises -= messageIdentifier.id
target.trySuccess(message)
}
}
}
/**
* AskableReceive partial function is used to wrap the Actor's receive partial function so it can intercept and
* process 'ask' messages.
*
* @param r A new partial function that wraps the user defined one and is used by Actor's receive method.
*/
class AskableReceive(r: Actor.Receive) extends Actor.Receive {
/**
* When the AskableReceive partial function is invoked with a message it checks if the message is 'ask' message.
* If it is 'ask' message it's cached in a map with generated message ID and the actual message payload is passed
* to the user defined receive partial function. If the message is not 'ask' then the method basically just
* forwards the invocation to the user's receive partial function.
*
* @param x The message that the current AskableReceive partial function is invoked with.
*/
def apply(x: Any): Unit = x match {
case AskMessageWrapper(senderPromise, payload) => {
isCurrentProcessingAsk = true
val messageId = nextMessageId
senderPromises += messageId -> senderPromise
try r(payload)
catch {
case t: Throwable => {
senderPromises -= messageId
senderPromise.tryFailure(t)
throw t
}
}
}
case msg => {
isCurrentProcessingAsk = false
r(msg)
}
}
/**
* Check if the partial function is defined for the provided message.
*
* If the message is ask, we extract the payload and check if user's partial function is defined for the original
* message payload. If the message is not 'ask' then we directly check if user's partial function is defined for
* this message.
*
* @param x The message that has to be check against the partial function.
* @return True if the partial function is defined for this message, false - otherwise.
*/
def isDefinedAt(x: Any): Boolean = x match {
case AskMessageWrapper(_, payload) => r.isDefinedAt(payload)
case msg => r.isDefinedAt(x)
}
}
/** AskableReceive companion object - defines two factory apply methods. */
object AskableReceive {
/** Create new AskableReceive instance. */
def apply(r: Actor.Receive): Actor.Receive = new AskableReceive(r)
/** Create new AskableReceive instance with the ability to enable receive logging. */
def apply(withLogging: Boolean = false)(r: Actor.Receive): Actor.Receive =
if(withLogging) new AskableReceive(LoggingReceive(r))
else new AskableReceive(r)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment