Skip to content

Instantly share code, notes, and snippets.

@freekh
Created October 12, 2011 18:35
Show Gist options
  • Save freekh/1282114 to your computer and use it in GitHub Desktop.
Save freekh/1282114 to your computer and use it in GitHub Desktop.
n00b fault tolerant load balancing
package akka.training
import akka.actor.Supervisor
import akka.config.Supervision._
import akka.actor.Actor
import akka.actor.Actor._
import akka.routing._
import akka.config.Supervision.OneForOneStrategy
import akka.actor.RemoteActorRef
import akka.actor.ActorRef
import akka.serialization.ActorSerialization._
import BinaryFormatMyStatelessActor._
import akka.util.duration._
import akka.event.EventHandler
//getting a tuple but I just wanted to send self (just a reference but it did not work because of serialization).
// strange because I am quite sure I sent the actorref (self) Oh well, maybe too hungry to figure it out
case class IsAlive(service : String, hostname : String, port : Int)
case object Alive
class FTPrinter(service : String, hostname : String, port : Int) extends Actor {
def receive = {
case Alive => self.channel ! IsAlive(service, hostname, port)
case m => println("%s: %s" format (service, m))
}
}
class FTLBDispatcher extends Actor {
val nodes = Set(("ponger", Nodes.Two._1, Nodes.Two._2), ("pinger", Nodes.Three._1, Nodes.Three._2))
def remoteActorForId(id : (String, String, Int)) = remote.actorFor(id._1, id._2, id._3)
val all = nodes.map( id => id -> remoteActorForId(id)).toMap
var live = all //assuming everybody is alive, will fail if not
def reInitIterator = new CyclicIterator(live.values.toList) //TODO: mixin instead?
var iterator = reInitIterator
def hearbeat = {
val dead = all.keySet.diff(live.keySet)
dead.foreach { k => //check if dead are alive
try {
all(k) tryTell Alive
} catch {
case _ =>
}
}
}
def receive = {
case IsAlive(service, hostname, port) =>
EventHandler.debug(this, "got an alive message from a %s at %s:%s" format(service, hostname, port))
live += (service, hostname, port) -> remoteActorForId((service, hostname, port))
iterator = reInitIterator
case m =>
EventHandler.debug(this, "got message. live now: %s" format live)
try {
iterator.next() forward m //try to send to live ones first
} finally {
hearbeat //then see if there are any new live ones
}
}
override def postRestart(reason : Throwable): Unit = {
//TODO: use an exception, e.g. ActorCrashException, as a reason and use it
// to remove dead from the list?
//if no correct replies == assume dead, will be picked up again
live = all.filterNot{ p =>
try {
(p._2 ? Alive).await(100 millis).isCompleted
} catch {
case _ => false
}
}
iterator = reInitIterator
EventHandler.info(this, "was restarted. live now: %s" format live)
}
}
object Nodes {
val One = ("localhost", 1337)
val Two = ("localhost", 6164)
val Three = ("localhost", 1234)
val Four = ("localhost", 6347)
}
object ServerOne extends App {
val node = Nodes.One
remote.start(node._1, node._2)
val dispatcher = actorOf[FTLBDispatcher].start()
remote.register("dispatcher", dispatcher)
val supervisor = Supervisor(
SupervisorConfig(
AllForOneStrategy(List(classOf[Exception]), 1, 10),
Supervise(
dispatcher,
Permanent)
:: Nil))
}
object ServerTwo extends App {
val node = Nodes.Two
remote.start(node._1, node._2)
val service = "ponger"
remote.register(service, actorOf(new FTPrinter(service, node._1, node._2)).start())
}
object ServerThree extends App {
val node = Nodes.Three
remote.start(node._1, node._2)
val service = "pinger"
remote.register(service, actorOf(new FTPrinter(service, node._1, node._2)).start())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment