Created
August 23, 2013 09:09
-
-
Save patriknw/6317142 to your computer and use it in GitHub Desktop.
Identify actors matching wildcard ActorSelection.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com> | |
*/ | |
package akka.contrib.pattern | |
import akka.actor.ActorIdentity | |
import akka.actor.Props | |
import akka.actor.ActorSelection | |
import scala.concurrent.duration.Deadline | |
import akka.actor.ReceiveTimeout | |
import akka.actor.Actor | |
import akka.actor.ActorRef | |
import akka.actor.Identify | |
import akka.actor.Deploy | |
import akka.actor.LocalScope | |
@SerialVersionUID(1L) | |
case class ResolveManyResult(refs: Set[ActorRef]) { | |
/** | |
* Java API | |
*/ | |
def getRefs: java.util.Set[ActorRef] = | |
scala.collection.JavaConverters.setAsJavaSetConverter(refs).asJava | |
} | |
object ResolveManyCollector { | |
def props(selection: ActorSelection, max: Int, deadline: Deadline, sendResultTo: ActorRef): Props = | |
Props(classOf[ResolveManyCollector], max, deadline, sendResultTo: ActorRef).withDeploy(Deploy(scope = LocalScope)) | |
} | |
/** | |
* Identify actors matching wildcard ActorSelection. This actor | |
* will send Identify to the selection and collect the replies. | |
* It will send [[ResolveManyResult]] to the supplied `sendResultTo` | |
* actor and stop itself when `max` ActorIdentity has been collected | |
* or the `deadline` is overdue. | |
*/ | |
class ResolveManyCollector( | |
selection: ActorSelection, | |
max: Int, | |
deadline: Deadline, | |
sendResultTo: ActorRef) extends Actor { | |
var count = 0 | |
var result = Set.empty[ActorRef] | |
context.setReceiveTimeout(deadline.timeLeft) | |
selection ! Identify(None) | |
def receive = { | |
case ActorIdentity(_, refOption) ⇒ | |
count += 1 | |
refOption foreach { result += _ } | |
context.setReceiveTimeout(deadline.timeLeft) | |
if (count == max || deadline.isOverdue) | |
completeResult() | |
case ReceiveTimeout ⇒ | |
completeResult() | |
} | |
def completeResult(): Unit = { | |
sendResultTo ! ResolveManyResult(result) | |
context.stop(self) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment