Created
October 1, 2013 09:36
-
-
Save rkuhn/6776048 to your computer and use it in GitHub Desktop.
Staying in Contact with an Actor’s Acquaintances
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.TimeUnit; | |
import scala.concurrent.duration.Duration; | |
import akka.actor.ActorIdentity; | |
import akka.actor.ActorPath; | |
import akka.actor.ActorRef; | |
import akka.actor.Identify; | |
import akka.actor.ReceiveTimeout; | |
import akka.actor.Terminated; | |
import akka.actor.UntypedActorWithStash; | |
import akka.japi.Procedure; | |
public class WatcherForwarder extends UntypedActorWithStash { | |
final ActorPath remote; | |
public WatcherForwarder(ActorPath remote) { | |
this.remote = remote; | |
inquire(); | |
} | |
private void inquire() { | |
getContext().actorSelection(remote).tell(new Identify(getSelf()), getSelf()); | |
getContext().setReceiveTimeout(Duration.create(5, TimeUnit.SECONDS)); | |
} | |
@Override | |
public void onReceive(Object msg) throws Exception { | |
if (msg instanceof ActorIdentity) { | |
final ActorIdentity id = (ActorIdentity) msg; | |
if (id.getRef() != null) { | |
becomeActive(id.getRef()); | |
} | |
} else if (msg instanceof ReceiveTimeout) { | |
inquire(); | |
} else stash(); | |
} | |
private void becomeActive(final ActorRef ref) { | |
getContext().watch(ref); | |
getContext().setReceiveTimeout(Duration.Undefined()); | |
unstashAll(); | |
getContext().become(new Procedure<Object>() { | |
@Override | |
public void apply(Object msg) throws Exception { | |
if (msg instanceof Terminated) { | |
inquire(); | |
getContext().unbecome(); | |
} else if (msg instanceof ReceiveTimeout || msg instanceof ActorIdentity) { | |
// ignore | |
} else { | |
ref.tell(msg, getSender()); | |
} | |
} | |
}, true); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor._ | |
import scala.concurrent.duration._ | |
class WatcherForwarder(remote: ActorPath) extends Actor with Stash { | |
def receive = waiting | |
def inquire() { context.actorSelection(remote) ! Identify(self) } | |
def waiting: Receive = { | |
inquire() | |
context.setReceiveTimeout(10.seconds) | |
{ | |
case ActorIdentity(`self`, Some(ref)) => context.become(active(ref)) | |
case ActorIdentity(`self`, None) => // ignore | |
case ReceiveTimeout => inquire() | |
case _ => stash() // this implies that clients do not resend aggressively | |
} | |
} | |
def active(ref: ActorRef): Receive = { | |
context.watch(ref) | |
context.setReceiveTimeout(Duration.Undefined) | |
unstashAll() | |
{ | |
case Terminated(`ref`) => context.become(waiting) | |
case ActorIdentity(`self`, _) => // ignore | |
case msg => ref forward msg | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment