Created
January 22, 2012 12:42
-
-
Save patriknw/1656952 to your computer and use it in GitHub Desktop.
FaultHandling Sample
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package sample | |
import akka.actor.Actor | |
import akka.actor.Props | |
import akka.dispatch.Await | |
import akka.util.duration._ | |
import akka.util.Timeout | |
import akka.actor.ActorRef | |
import akka.actor.Terminated | |
import akka.actor.ActorSystem | |
import akka.actor.OneForOneStrategy | |
import akka.actor.FaultHandlingStrategy._ | |
import akka.event.LoggingReceive | |
import com.typesafe.config.ConfigFactory | |
import akka.actor.ActorLogging | |
object Sample extends App { | |
import Counter._ | |
import Storage._ | |
val config = ConfigFactory.parseString(""" | |
akka.loglevel = DEBUG | |
akka.actor.debug { | |
receive = on | |
lifecycle = on | |
} | |
""") | |
val system = ActorSystem("Sample", config) | |
val strategy = OneForOneStrategy({ | |
case e: StorageException ⇒ println("## Got it outside Counter" + e); Restart | |
}: Decider, maxNrOfRetries = Some(3), withinTimeRange = Some(5000)) | |
val a = system.actorOf(Props[Counter].withFaultHandler(strategy), name = "counter") | |
a ! Increment(10) | |
a ! Increment(1) | |
a ! Increment(1) | |
a ! Increment(1) | |
a ! Increment(1) | |
a ! Increment(1) | |
a ! Increment(1) | |
a ! Increment(1) | |
system.actorOf(Props(new Actor() with ActorLogging { | |
a ! GetCurrentCount | |
implicit def system = context.system | |
def receive = LoggingReceive(this) { | |
case CurrentCount(key, count) ⇒ | |
log.info("Current count for [{}] is [{}]", key, count) | |
} | |
})) | |
30.seconds.sleep() | |
system.shutdown() | |
} | |
object Counter { | |
case class Increment(n: Int) | |
case object GetCurrentCount | |
case class CurrentCount(key: String, count: Long) | |
case object Reconnect | |
} | |
class Counter extends Actor { | |
import Counter._ | |
import Storage._ | |
var count = 0L | |
var storage: Option[ActorRef] = None | |
def key = context.self.path.name | |
implicit def system = context.system | |
override def preStart() { | |
initStorage() | |
storage foreach { s ⇒ | |
implicit val timeout = Timeout(5 seconds) | |
Await.result(s ? Get(key), timeout.duration) match { | |
case Entry(k, v) if k == key ⇒ count = v | |
} | |
} | |
} | |
def initStorage() { | |
// FIXME withinTimeRange should be duration | |
val strategy = OneForOneStrategy({ | |
case e: StorageException ⇒ println("## Got it in Counter" + e); Restart | |
}: Decider, maxNrOfRetries = Some(3), withinTimeRange = Some(5000)) | |
storage = Some(context.watch(context.actorOf( | |
Props[Storage].withFaultHandler(strategy), name = "storage"))) | |
} | |
def storeCount() { | |
// Delegate dangerous work to child | |
storage foreach { _ ! Store(Entry(key, count)) } | |
} | |
def receive = LoggingReceive(this) { | |
case Increment(n) ⇒ | |
count += n | |
storeCount() | |
case GetCurrentCount ⇒ | |
sender ! CurrentCount(key, count) | |
case Terminated(actorRef) if Some(actorRef) == storage ⇒ | |
storage = None | |
context.system.scheduler.scheduleOnce(10 seconds, self, Reconnect) | |
case Reconnect ⇒ | |
initStorage() | |
storeCount() | |
} | |
} | |
object Storage { | |
case class Store(entry: Entry) | |
case class Get(key: String) | |
case class Entry(key: String, value: Long) | |
class StorageException(msg: String) extends RuntimeException(msg) | |
} | |
class Storage extends Actor { | |
import Storage._ | |
val db = DummyDB | |
implicit def system = context.system | |
def receive = LoggingReceive(this) { | |
case Store(Entry(key, count)) ⇒ db.save(key, count) | |
case Get(key) ⇒ sender ! Entry(key, db.load(key).getOrElse(0L)) | |
} | |
} | |
object DummyDB { | |
import Storage.StorageException | |
var db = Map[String, Long]() | |
@throws(classOf[StorageException]) | |
def save(key: String, value: Long): Unit = synchronized { | |
if (11 <= value && value <= 14) { println("failure"); throw new StorageException("Simulated store failure " + value) } | |
db += (key -> value) | |
} | |
@throws(classOf[StorageException]) | |
def load(key: String): Option[Long] = synchronized { | |
db.get(key) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment