Skip to content

Instantly share code, notes, and snippets.

@patriknw
Created August 19, 2013 17:39
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save patriknw/6271877 to your computer and use it in GitHub Desktop.
Save patriknw/6271877 to your computer and use it in GitHub Desktop.
Client side subscriber
object TopicAssistant {
case class Subscribe(topic: String)
case object SubscribeAck
}
// Start this actor with name "topicAssistant" on each node in the cluster
class TopicAssistant extends Actor {
import TopicAssistant._
ClusterReceptionistExtension(context.system).registerService(self)
def receive = {
case Subscribe(topic) ⇒
context.watch(sender)
DistributedPubSubExtension(context.system).mediator ! DistributedPubSubMediator.Subscribe(topic, sender)
sender ! SubscribeAck
}
}
class ClientSubscriber extends Actor {
def initialContacts = Set(
context.actorSelection("akka.tcp://ClusterSystem@host1:2552/user/receptionist"),
context.actorSelection("akka.tcp://ClusterSystem@host2:2552/user/receptionist"))
val clusterClient = context.actorOf(ClusterClient.props(initialContacts), "clusterClient")
def scheduleTick(): Unit =
context.system.scheduler.scheduleOnce(3.seconds, self, "tick")(context.dispatcher)
override def preStart(): Unit = {
super.preStart()
scheduleTick()
}
val subscribeMsg =
ClusterClient.Send("/user/topicAssistant", TopicAssistant.Subscribe("content"), localAffinity = true)
def receive = {
case "tick" ⇒
clusterClient ! subscribeMsg
scheduleTick()
case TopicAssistant.SubscribeAck ⇒
context.watch(sender)
context.become(initialized, discardOld = false)
}
def initialized(): Actor.Receive = {
case Terminated(ref) ⇒
scheduleTick()
context.unbecome()
case "tick" ⇒ // ok
case msg ⇒ println("GOT IT FROM TOPIC: " + msg)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment