Skip to content

Instantly share code, notes, and snippets.

@danielhopkins
Created March 28, 2013 02:54
Show Gist options
  • Save danielhopkins/5260157 to your computer and use it in GitHub Desktop.
Save danielhopkins/5260157 to your computer and use it in GitHub Desktop.
class EchoActor extends Actor with ActorLogging {
val cluster = Cluster(context.system)
override def preStart() { cluster.subscribe(self, classOf[LeaderChanged]) }
override def postStop() { cluster.unsubscribe(self) }
def receive = {
case state: CurrentClusterState => log.info(s"State is $state")
case LeaderChanged(leader) => log.info(s"Leader is $leader")
case msg => {
log.info(s"Got msg $msg from $sender")
sender ! msg
}
}
}
class BroadcastMain extends Bootable {
val config = ConfigFactory.load
val system = ActorSystem("Broadcast", config.getConfig("cluster"))
def startup() {
system.actorOf(Props[EchoActor], name = "echo")
}
def shutdown() { system.shutdown() }
}
class TestSender(val bridge: ActorRef) extends Actor with ActorLogging {
import context.dispatcher
override def preStart() {
context.system.scheduler.scheduleOnce(5 seconds) {
bridge ! "Hi Echo"
}
}
def receive = {
case s => {log.info("Got an echo: " + s)}
}
}
class LeaderSender extends Actor with ActorLogging {
val cluster = Cluster(context.system)
var currentLeader: Option[Address] = None
override def preStart() { cluster.subscribe(self, classOf[LeaderChanged]) }
override def postStop() { cluster.unsubscribe(self) }
def receive = {
case state: CurrentClusterState => currentLeader = state.leader
case LeaderChanged(leader) => currentLeader = leader
case msg => {
implicit val timeout = Timeout(1 second)
log.info(s"Sending msg from $sender")
currentLeader.foreach { l =>
/*
Here are two different ways we can send the message
If I pass in a regular actorRef ("normalActorRef")
only the ask & pipeTo returns the message correctly.
If I use the externalActorRef, both of them work.
The second one is what I'm trying for, as it seems to be
more idiomatic
*/
// leaderBroadcast(l).ask(msg).pipeTo(sender)
leaderBroadcast(l).forward(msg)
}
}
}
def leaderBroadcast(l: Address): ActorRef = context.actorFor(RootActorPath(l) / "user" / "echo")
}
class TestMain extends Bootable {
val configStr = """
akka {
loglevel = INFO
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
transport = "akka.remote.netty.NettyRemoteTransport"
netty {
hostname = "127.0.0.1"
port = 0
}
}
}
"""
val config = ConfigFactory.parseString(configStr)
val system = ActorSystem("TestSystem", config)
val clusterSystem = ActorSystem("Broadcast", ConfigFactory.load.getConfig("cluster")
.withValue("akka.remote.netty.port", ConfigValueFactory.fromAnyRef(0)))
def startup() {
clusterSystem.actorOf(Props[LeaderSender], name = "bridge")
/* By using the clusterAddress to create the actorRef we appear
to get differently functioning actorRefs.
*/
val clusterAddress = ExternalAddress(clusterSystem).addressForAkka
val externalActorRef = system.actorFor(RootActorPath(clusterAddress) / "user" / "bridge")
val normalActorRef = clusterSystem.actorFor("/user/bridge")
system.actorOf(Props(new TestSender(externalActorRef)), "testSender")
}
def shutdown() { system.shutdown() }
}
object ExternalAddress extends ExtensionKey[ExternalAddressExt]
class ExternalAddressExt(system: ExtendedActorSystem) extends Extension {
def addressForAkka: Address = system.provider.getDefaultAddress
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment