Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
sample akka cluster client.
package spikes.cluster
import akka.actor._
import com.typesafe.config.ConfigFactory
import akka.contrib.pattern.{ClusterClient, ClusterReceptionistExtension}
object DemoMaster {
def main(args: Array[String]): Unit = {
val config = ConfigFactory.parseString("""
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
transport = "akka.remote.netty.NettyRemoteTransport"
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 2551
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2551"
]
roles = [master]
auto-down = on
}
}""")
val system = ActorSystem("ClusterSystem", ConfigFactory.load(config))
val master = system.actorOf(Props[ClusterMaster], "master")
ClusterReceptionistExtension(system).registerService(master)
}
class ClusterMaster extends Actor with ActorLogging {
def receive = {
case e =>
log.info(s"from master : $e : $sender")
sender ! "master : how are you?"
}
}
}
object DemoMember {
def main(args: Array[String]) {
val config = ConfigFactory.parseString("""
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
transport = "akka.remote.netty.NettyRemoteTransport"
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 3000
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2551"
]
auto-down = on
}
}""")
val system = ActorSystem("ClusterSystem", ConfigFactory.load(config))
val clusterMember = system.actorOf(Props[ClusterMember], "member")
ClusterReceptionistExtension(system).registerService(clusterMember)
}
class ClusterMember extends Actor with ActorLogging {
def receive = {
case e =>
log.info(s"from member : $e : $sender")
sender ! "member : how are you?"
}
}
}
object DemoClient {
def main(args : Array[String]) {
val config = ConfigFactory.parseString("""
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
transport = "akka.remote.netty.NettyRemoteTransport"
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 5000
}
}
}""")
val system = ActorSystem("OTHERSYSTEM", ConfigFactory.load(config))
val initialContacts = Set(
system.actorSelection("akka.tcp://ClusterSystem@127.0.0.1:2551/user/receptionist"),
system.actorSelection("akka.tcp://ClusterSystem@127.0.0.1:3000/user/receptionist"))
val c = system.actorOf(ClusterClient.props(initialContacts), "os-client")
(1 to 1000).map { i =>
c ! ClusterClient.Send("/user/master", s"hello - $i", localAffinity = true)
c ! ClusterClient.Send("/user/member", s"hello - $i", localAffinity = true)
Thread.sleep(1000)
}
}
}
@rkuhn

This comment has been minimized.

Copy link

commented Jul 28, 2013

Replies can be received by an actor, so you have to create an actor on the client first. Then it would be most convenient to move the sending of the messages into that actor which will automatically set the right sender reference for the ! method (lines 117/118). Handle messages of type String and you will see your replies.

@Yord

This comment has been minimized.

Copy link

commented Aug 7, 2013

Thank you very much. This saved me a lot of work!

@asethia

This comment has been minimized.

Copy link

commented Jan 4, 2016

Excellent work... I was looking for this code. Thanks

using Akka 2.4.x It would require following changes:

  1. import akka.contrib.pattern.{ClusterClient, ClusterReceptionistExtension} to import akka.cluster.client.{ClusterClient,ClusterClientReceptionist}

  2. ClusterReceptionistExtension(system).registerService(master) to ClusterClientReceptionist(system).registerService(master)

  3. val initialContacts = Set(
    system.actorSelection("akka.tcp://ClusterSystem@127.0.0.1:2551/user/receptionist"),
    system.actorSelection("akka.tcp://ClusterSystem@127.0.0.1:3000/user/receptionist")) to

    val initialContacts = Set(
    system.actorSelection("akka.tcp://ClusterSystem@127.0.0.1:2551/system/receptionist"),
    system.actorSelection("akka.tcp://ClusterSystem@127.0.0.1:3000/system/receptionist"))

@aubreylouw

This comment has been minimized.

Copy link

commented Jun 7, 2016

To avoid dead-letters right after the receptionist cluster is up, add akka.extensions = ["akka.cluster.client.ClusterClientReceptionist"] to the application.conf

@atatous

This comment has been minimized.

Copy link

commented Aug 14, 2018

I needed to update this code with latest Akka version. Also, I added code so ClusterClient sends it's own actorRef in order to get a reply back directly, and took out DemoMember. The code below is tested with sbt.version = 0.13.15.

build.sbt
name := "ClusterClient"
version := "0.1"
scalaVersion := "2.12.4"
resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.5.14"
libraryDependencies += "com.typesafe.akka" %% "akka-cluster-tools" % "2.5.14"

import akka.actor.{Actor, ActorLogging, ActorPath, ActorSystem, Props}
import akka.cluster.client.{ClusterClient, ClusterClientSettings}
import com.typesafe.config.ConfigFactory

object DemoClient {

  def main(args : Array[String]) {
    val config = ConfigFactory.parseString("""
     akka {
       log-dead-letters = OFF
       actor {
         provider = "akka.remote.RemoteActorRefProvider"
       }

       remote {
         transport = "akka.remote.netty.NettyRemoteTransport"
         log-remote-lifecycle-events = off
         netty.tcp {
          hostname = "localhost"
          port = 5000
         }
       }
     }""")

    val system = ActorSystem("OUTSIDER-SYSTEM", ConfigFactory.load(config))

    val initialContacts = Set(
      ActorPath.fromString("akka.tcp://ClusterSystem@localhost:2551/system/receptionist"))


    val cc = system.actorOf(ClusterClient.props(
      ClusterClientSettings(system).withInitialContacts(initialContacts)), "os-client")

    val ccActor = system.actorOf(Props[ClusterClientActor], "ccActor")

    cc ! ClusterClient.Send("/user/master", ccActor, localAffinity = true)

    (1 to 10).map { i =>
      cc ! ClusterClient.Send("/user/master", s"hello - $i", localAffinity = true)
      Thread.sleep(10000)
    }
  }

  class ClusterClientActor extends Actor with ActorLogging {
    def receive = {
      case e =>
        log.info(s"from cluster-client : $e : $sender")
    }
  }
}


import akka.actor.{ActorRef, _}
import akka.cluster.client.ClusterClientReceptionist
import com.typesafe.config.ConfigFactory

object DemoMaster {

  def main(args: Array[String]): Unit = {
    
    val config = ConfigFactory.parseString("""
     akka {
       log-dead-letters = OFF
       extensions = ["akka.cluster.client.ClusterClientReceptionist"]
       actor {
         provider = "akka.cluster.ClusterActorRefProvider"
       }
       remote {
         transport = "akka.remote.netty.NettyRemoteTransport"
         log-remote-lifecycle-events = off
         netty.tcp {
           hostname = "localhost"
           port = 2551
         }
       }
       cluster {
         seed-nodes = [
           "akka.tcp://ClusterSystem@localhost:2551"
           ]
         roles = [master]
         auto-down = on
       }
     }""")

    val system = ActorSystem("ClusterSystem", ConfigFactory.load(config))

    val master = system.actorOf(Props[ClusterMaster], "master")
    ClusterClientReceptionist(system).registerService(master)
  }

  class ClusterMaster extends Actor with ActorLogging {
    var senderActor : ActorRef = null;

    def receive = {
      case a: ActorRef =>
        log.info(s"from master : $a : $sender")
        senderActor = a
      case e =>
        log.info(s"from master : $e : $senderActor")
        if(senderActor != null) senderActor ! "master : how are you?"
    }
  }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.