Akka DistributedPubSubExtension multi-node cluster sample
package test.akka
import{Props, ActorLogging, Actor, ActorSystem}
import akka.cluster.Cluster
import akka.contrib.pattern.DistributedPubSubExtension
import akka.contrib.pattern.DistributedPubSubMediator.{SubscribeAck, Subscribe, Publish}
import com.typesafe.config.ConfigFactory
* Created by
* @author Basel Darvish
object AkkaDistributedPubSub extends App{
val systemC1 = ActorSystem("system1", ConfigFactory.load("c1"))
val systemC2 = ActorSystem("system1", ConfigFactory.load("c2"))
Thread.sleep(10000)//waiting for cluster to initiate
Thread.sleep(3000)//waiiting for subscriber to initiate
val publisher = systemC2.actorOf(Props(classOf[Publisher]))
publisher ! "Hey!"
class Subscriber extends Actor with ActorLogging {
val mediator = DistributedPubSubExtension(context.system).mediator
mediator ! Subscribe("t", self)
log info("subscribed to: {}", "t")
def receive = {
case SubscribeAck(Subscribe("t", _, `self`))
context become ready
def ready: Receive = {
case s:String =>
log info("++++++++++++++++ Received message: {}", s)
case a:Any =>
log info("Received unknown message: {}", a)
class Publisher extends Actor with ActorLogging {
val mediator = DistributedPubSubExtension(context.system).mediator
override def receive: Receive = {
case s:String =>
log info("________________Publishing message: {}", s)
mediator ! Publish("t", s)
extensions = ["akka.contrib.pattern.DistributedPubSubExtension"]
actor {
provider = "akka.cluster.ClusterActorRefProvider"
remote {
netty.tcp {
hostname = ""
port = 2552
cluster {
seed-nodes = [
auto-down = on
auto-down-unreachable-after = 5s
extensions = ["akka.contrib.pattern.DistributedPubSubExtension"]
actor {
provider = "akka.cluster.ClusterActorRefProvider"
remote {
netty.tcp {
hostname = ""
port = 2551
cluster {
seed-nodes = [
auto-down = on
auto-down-unreachable-after = 5s
