class EchoActor extends Actor with ActorLogging {
val cluster = Cluster(context.system)
override def preStart() { cluster.subscribe(self, classOf[LeaderChanged]) }
override def postStop() { cluster.unsubscribe(self) }
def receive = {
case state: CurrentClusterState =>"State is $state")
case LeaderChanged(leader) =>"Leader is $leader")
case msg => {"Got msg $msg from $sender")
sender ! msg
class BroadcastMain extends Bootable {
val config = ConfigFactory.load
val system = ActorSystem("Broadcast", config.getConfig("cluster"))
def startup() {
system.actorOf(Props[EchoActor], name = "echo")
def shutdown() { system.shutdown() }
class TestSender(val bridge: ActorRef) extends Actor with ActorLogging {
import context.dispatcher
override def preStart() {
context.system.scheduler.scheduleOnce(5 seconds) {
bridge ! "Hi Echo"
def receive = {
case s => {"Got an echo: " + s)}
class LeaderSender extends Actor with ActorLogging {
val cluster = Cluster(context.system)
var currentLeader: Option[Address] = None
override def preStart() { cluster.subscribe(self, classOf[LeaderChanged]) }
override def postStop() { cluster.unsubscribe(self) }
def receive = {
case state: CurrentClusterState => currentLeader = state.leader
case LeaderChanged(leader) => currentLeader = leader
case msg => {
implicit val timeout = Timeout(1 second)"Sending msg from $sender")
currentLeader.foreach { l =>
Here are two different ways we can send the message
If I pass in a regular actorRef ("normalActorRef")
only the ask & pipeTo returns the message correctly.
If I use the externalActorRef, both of them work.
The second one is what I'm trying for, as it seems to be
more idiomatic
// leaderBroadcast(l).ask(msg).pipeTo(sender)
def leaderBroadcast(l: Address): ActorRef = context.actorFor(RootActorPath(l) / "user" / "echo")
class TestMain extends Bootable {
val configStr = """
akka {
loglevel = INFO
actor {
provider = "akka.remote.RemoteActorRefProvider"
remote {
transport = "akka.remote.netty.NettyRemoteTransport"
netty {
hostname = ""
port = 0
val config = ConfigFactory.parseString(configStr)
val system = ActorSystem("TestSystem", config)
val clusterSystem = ActorSystem("Broadcast", ConfigFactory.load.getConfig("cluster")
.withValue("akka.remote.netty.port", ConfigValueFactory.fromAnyRef(0)))
def startup() {
clusterSystem.actorOf(Props[LeaderSender], name = "bridge")
/* By using the clusterAddress to create the actorRef we appear
to get differently functioning actorRefs.
val clusterAddress = ExternalAddress(clusterSystem).addressForAkka
val externalActorRef = system.actorFor(RootActorPath(clusterAddress) / "user" / "bridge")
val normalActorRef = clusterSystem.actorFor("/user/bridge")
system.actorOf(Props(new TestSender(externalActorRef)), "testSender")
def shutdown() { system.shutdown() }
object ExternalAddress extends ExtensionKey[ExternalAddressExt]
class ExternalAddressExt(system: ExtendedActorSystem) extends Extension {
def addressForAkka: Address = system.provider.getDefaultAddress
