Skip to content

Instantly share code, notes, and snippets.

@rkuhn
Created October 1, 2013 09:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rkuhn/6776048 to your computer and use it in GitHub Desktop.
Save rkuhn/6776048 to your computer and use it in GitHub Desktop.
Staying in Contact with an Actor’s Acquaintances
import java.util.concurrent.TimeUnit;
import scala.concurrent.duration.Duration;
import akka.actor.ActorIdentity;
import akka.actor.ActorPath;
import akka.actor.ActorRef;
import akka.actor.Identify;
import akka.actor.ReceiveTimeout;
import akka.actor.Terminated;
import akka.actor.UntypedActorWithStash;
import akka.japi.Procedure;
public class WatcherForwarder extends UntypedActorWithStash {
final ActorPath remote;
public WatcherForwarder(ActorPath remote) {
this.remote = remote;
inquire();
}
private void inquire() {
getContext().actorSelection(remote).tell(new Identify(getSelf()), getSelf());
getContext().setReceiveTimeout(Duration.create(5, TimeUnit.SECONDS));
}
@Override
public void onReceive(Object msg) throws Exception {
if (msg instanceof ActorIdentity) {
final ActorIdentity id = (ActorIdentity) msg;
if (id.getRef() != null) {
becomeActive(id.getRef());
}
} else if (msg instanceof ReceiveTimeout) {
inquire();
} else stash();
}
private void becomeActive(final ActorRef ref) {
getContext().watch(ref);
getContext().setReceiveTimeout(Duration.Undefined());
unstashAll();
getContext().become(new Procedure<Object>() {
@Override
public void apply(Object msg) throws Exception {
if (msg instanceof Terminated) {
inquire();
getContext().unbecome();
} else if (msg instanceof ReceiveTimeout || msg instanceof ActorIdentity) {
// ignore
} else {
ref.tell(msg, getSender());
}
}
}, true);
}
}
import akka.actor._
import scala.concurrent.duration._
class WatcherForwarder(remote: ActorPath) extends Actor with Stash {
def receive = waiting
def inquire() { context.actorSelection(remote) ! Identify(self) }
def waiting: Receive = {
inquire()
context.setReceiveTimeout(10.seconds)
{
case ActorIdentity(`self`, Some(ref)) => context.become(active(ref))
case ActorIdentity(`self`, None) => // ignore
case ReceiveTimeout => inquire()
case _ => stash() // this implies that clients do not resend aggressively
}
}
def active(ref: ActorRef): Receive = {
context.watch(ref)
context.setReceiveTimeout(Duration.Undefined)
unstashAll()
{
case Terminated(`ref`) => context.become(waiting)
case ActorIdentity(`self`, _) => // ignore
case msg => ref forward msg
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment