Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import java.util.concurrent.atomic.AtomicInteger
import concurrent.{Future, Promise}
import akka.actor._
import concurrent.duration._
import util.control.NoStackTrace
object RemoteActorResolver {
val resolverCount = new AtomicInteger(0)
type LookupMap = Map[ActorPath, Promise[ActorRef]]
class ActorResolutionException(message: String) extends Exception(message) with NoStackTrace
def props(lookup: LookupMap, timeout: Duration, enableNoMatch: Boolean): Props =
Props(classOf[RemoteActorResolver], lookup, timeout, enableNoMatch)
/**
* Start resolving ActorPaths to ActorRefs
*
* @param factory `ActorContext` or `ActorSystem`
* @param paths Collection of `ActorPaths` to resolve
* @param timeout Timeout after which promises will be failed
* @param enableNoMatch If `true` then receiving `ActorIdentity(path, None)` will cause resolution
* for the path to fail. Otherwise it is ignored and only timeouts cause lookup
* failures.
* @return Map of `ActorPath` to `Promise[ActorRef]` where the `ActorPaths` are
* the `paths` and the promises will be completed when the paths are
* resolved (or failed if the timeout is reached).
*/
def startResolution(factory: ActorRefFactory, paths: Seq[ActorPath], timeout: Duration = 5.minutes,
enableNoMatch: Boolean = false): LookupMap = {
val lookup = paths.map(_ -> Promise[ActorRef]()).toMap
factory.actorOf(props(lookup, timeout, enableNoMatch), "remoteActorResolver" + resolverCount.incrementAndGet())
lookup
}
/**
* Resolve an ActorPath asynchronously
*
* @param lookup Map returned from `StartResolution`
* @param path One of the ActorPaths in the `lookup` map
* @return A `Future[ActorRef]` that will be completed when the `ActorPath` is resolved.
*/
def resolvePathAsync(lookup: LookupMap, path: ActorPath): Future[ActorRef] = lookup(path).future
}
/**
* Using `Identify`/`ActorIdentity`, try to resolve all of the `ActorPath`s in lookup map.
* As each `ActorPath` is successfully or unsuccessfully resolved its corresponding `Promise`
* is completed or failed, respectively.
*
* Stops self after the last path is resolved.
*
* Note: Ignores the case where `ActorIdentity(_, None)` is received. This is because the actor might
* not be created on the target system yet. TODO: maybe handle it optionally.
*
* @param lookup Prepopulated map of `ActorPath` to `Promise[ActorRef]`
* @param timeout After timeout all promises corresponding to unresolved paths will be failed and
* this actor will stop itself.
* @param enableNoMatch If `true` then receiving `ActorIdentity(path, None)` will cause resolution
* for the path to fail. Otherwise it is ignored and only timeouts cause lookup
* failures.
*/
class RemoteActorResolver(lookup: LookupMap, timeout: Duration, enableNoMatch: Boolean) extends Actor with ActorLogging {
val cycleTimeout = 5.seconds
var elapsed = Duration.Zero
context setReceiveTimeout cycleTimeout
sendIdentify()
def sendIdentify() = lookup foreach { case (path, promise) =>
if (!promise.isCompleted) context.system.actorSelection(path) ! Identify(path)
}
def stopIfFinished(): Unit = if (lookup.values.forall(_.isCompleted)) context stop self
def exception(message: String) = new ActorResolutionException(message)
def receive = {
case ActorIdentity(path: ActorPath, Some(ref)) =>
lookup(path) trySuccess ref
stopIfFinished()
case ActorIdentity(path: ActorPath, None) if enableNoMatch =>
lookup(path) tryFailure exception(s"No actor exists at $path")
stopIfFinished()
case ReceiveTimeout =>
elapsed += cycleTimeout
if (elapsed >= timeout) {
lookup filterNot(_._2.isCompleted) foreach { case (path, promise) =>
promise tryFailure exception(s"Timeout waiting for $path to be resolved")
}
context stop self
} else {
sendIdentify()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.