Skip to content

Instantly share code, notes, and snippets.

@phoenix24
Last active May 19, 2022 18:12
Show Gist options
  • Star 12 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save phoenix24/6097895 to your computer and use it in GitHub Desktop.
Save phoenix24/6097895 to your computer and use it in GitHub Desktop.
sample akka cluster client.
package spikes.cluster
import akka.actor._
import com.typesafe.config.ConfigFactory
import akka.contrib.pattern.{ClusterClient, ClusterReceptionistExtension}
object DemoMaster {
def main(args: Array[String]): Unit = {
val config = ConfigFactory.parseString("""
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
transport = "akka.remote.netty.NettyRemoteTransport"
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 2551
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2551"
]
roles = [master]
auto-down = on
}
}""")
val system = ActorSystem("ClusterSystem", ConfigFactory.load(config))
val master = system.actorOf(Props[ClusterMaster], "master")
ClusterReceptionistExtension(system).registerService(master)
}
class ClusterMaster extends Actor with ActorLogging {
def receive = {
case e =>
log.info(s"from master : $e : $sender")
sender ! "master : how are you?"
}
}
}
object DemoMember {
def main(args: Array[String]) {
val config = ConfigFactory.parseString("""
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
transport = "akka.remote.netty.NettyRemoteTransport"
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 3000
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2551"
]
auto-down = on
}
}""")
val system = ActorSystem("ClusterSystem", ConfigFactory.load(config))
val clusterMember = system.actorOf(Props[ClusterMember], "member")
ClusterReceptionistExtension(system).registerService(clusterMember)
}
class ClusterMember extends Actor with ActorLogging {
def receive = {
case e =>
log.info(s"from member : $e : $sender")
sender ! "member : how are you?"
}
}
}
object DemoClient {
def main(args : Array[String]) {
val config = ConfigFactory.parseString("""
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
transport = "akka.remote.netty.NettyRemoteTransport"
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 5000
}
}
}""")
val system = ActorSystem("OTHERSYSTEM", ConfigFactory.load(config))
val initialContacts = Set(
system.actorSelection("akka.tcp://ClusterSystem@127.0.0.1:2551/user/receptionist"),
system.actorSelection("akka.tcp://ClusterSystem@127.0.0.1:3000/user/receptionist"))
val c = system.actorOf(ClusterClient.props(initialContacts), "os-client")
(1 to 1000).map { i =>
c ! ClusterClient.Send("/user/master", s"hello - $i", localAffinity = true)
c ! ClusterClient.Send("/user/member", s"hello - $i", localAffinity = true)
Thread.sleep(1000)
}
}
}
@Yord
Copy link

Yord commented Aug 7, 2013

Thank you very much. This saved me a lot of work!

@asethia
Copy link

asethia commented Jan 4, 2016

Excellent work... I was looking for this code. Thanks

using Akka 2.4.x It would require following changes:

  1. import akka.contrib.pattern.{ClusterClient, ClusterReceptionistExtension} to import akka.cluster.client.{ClusterClient,ClusterClientReceptionist}

  2. ClusterReceptionistExtension(system).registerService(master) to ClusterClientReceptionist(system).registerService(master)

  3. val initialContacts = Set(
    system.actorSelection("akka.tcp://ClusterSystem@127.0.0.1:2551/user/receptionist"),
    system.actorSelection("akka.tcp://ClusterSystem@127.0.0.1:3000/user/receptionist")) to

    val initialContacts = Set(
    system.actorSelection("akka.tcp://ClusterSystem@127.0.0.1:2551/system/receptionist"),
    system.actorSelection("akka.tcp://ClusterSystem@127.0.0.1:3000/system/receptionist"))

@aubreylouw
Copy link

To avoid dead-letters right after the receptionist cluster is up, add akka.extensions = ["akka.cluster.client.ClusterClientReceptionist"] to the application.conf

@atatous
Copy link

atatous commented Aug 14, 2018

I needed to update this code with latest Akka version. Also, I added code so ClusterClient sends it's own actorRef in order to get a reply back directly, and took out DemoMember. The code below is tested with sbt.version = 0.13.15.

build.sbt
name := "ClusterClient"
version := "0.1"
scalaVersion := "2.12.4"
resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.5.14"
libraryDependencies += "com.typesafe.akka" %% "akka-cluster-tools" % "2.5.14"

import akka.actor.{Actor, ActorLogging, ActorPath, ActorSystem, Props}
import akka.cluster.client.{ClusterClient, ClusterClientSettings}
import com.typesafe.config.ConfigFactory

object DemoClient {

  def main(args : Array[String]) {
    val config = ConfigFactory.parseString("""
     akka {
       log-dead-letters = OFF
       actor {
         provider = "akka.remote.RemoteActorRefProvider"
       }

       remote {
         transport = "akka.remote.netty.NettyRemoteTransport"
         log-remote-lifecycle-events = off
         netty.tcp {
          hostname = "localhost"
          port = 5000
         }
       }
     }""")

    val system = ActorSystem("OUTSIDER-SYSTEM", ConfigFactory.load(config))

    val initialContacts = Set(
      ActorPath.fromString("akka.tcp://ClusterSystem@localhost:2551/system/receptionist"))


    val cc = system.actorOf(ClusterClient.props(
      ClusterClientSettings(system).withInitialContacts(initialContacts)), "os-client")

    val ccActor = system.actorOf(Props[ClusterClientActor], "ccActor")

    cc ! ClusterClient.Send("/user/master", ccActor, localAffinity = true)

    (1 to 10).map { i =>
      cc ! ClusterClient.Send("/user/master", s"hello - $i", localAffinity = true)
      Thread.sleep(10000)
    }
  }

  class ClusterClientActor extends Actor with ActorLogging {
    def receive = {
      case e =>
        log.info(s"from cluster-client : $e : $sender")
    }
  }
}


import akka.actor.{ActorRef, _}
import akka.cluster.client.ClusterClientReceptionist
import com.typesafe.config.ConfigFactory

object DemoMaster {

  def main(args: Array[String]): Unit = {
    
    val config = ConfigFactory.parseString("""
     akka {
       log-dead-letters = OFF
       extensions = ["akka.cluster.client.ClusterClientReceptionist"]
       actor {
         provider = "akka.cluster.ClusterActorRefProvider"
       }
       remote {
         transport = "akka.remote.netty.NettyRemoteTransport"
         log-remote-lifecycle-events = off
         netty.tcp {
           hostname = "localhost"
           port = 2551
         }
       }
       cluster {
         seed-nodes = [
           "akka.tcp://ClusterSystem@localhost:2551"
           ]
         roles = [master]
         auto-down = on
       }
     }""")

    val system = ActorSystem("ClusterSystem", ConfigFactory.load(config))

    val master = system.actorOf(Props[ClusterMaster], "master")
    ClusterClientReceptionist(system).registerService(master)
  }

  class ClusterMaster extends Actor with ActorLogging {
    var senderActor : ActorRef = null;

    def receive = {
      case a: ActorRef =>
        log.info(s"from master : $a : $sender")
        senderActor = a
      case e =>
        log.info(s"from master : $e : $senderActor")
        if(senderActor != null) senderActor ! "master : how are you?"
    }
  }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment