Skip to content

Instantly share code, notes, and snippets.

@patriknw
Created January 22, 2012 12:42
Show Gist options
  • Save patriknw/1656952 to your computer and use it in GitHub Desktop.
Save patriknw/1656952 to your computer and use it in GitHub Desktop.
FaultHandling Sample
package sample
import akka.actor.Actor
import akka.actor.Props
import akka.dispatch.Await
import akka.util.duration._
import akka.util.Timeout
import akka.actor.ActorRef
import akka.actor.Terminated
import akka.actor.ActorSystem
import akka.actor.OneForOneStrategy
import akka.actor.FaultHandlingStrategy._
import akka.event.LoggingReceive
import com.typesafe.config.ConfigFactory
import akka.actor.ActorLogging
object Sample extends App {
import Counter._
import Storage._
val config = ConfigFactory.parseString("""
akka.loglevel = DEBUG
akka.actor.debug {
receive = on
lifecycle = on
}
""")
val system = ActorSystem("Sample", config)
val strategy = OneForOneStrategy({
case e: StorageException ⇒ println("## Got it outside Counter" + e); Restart
}: Decider, maxNrOfRetries = Some(3), withinTimeRange = Some(5000))
val a = system.actorOf(Props[Counter].withFaultHandler(strategy), name = "counter")
a ! Increment(10)
a ! Increment(1)
a ! Increment(1)
a ! Increment(1)
a ! Increment(1)
a ! Increment(1)
a ! Increment(1)
a ! Increment(1)
system.actorOf(Props(new Actor() with ActorLogging {
a ! GetCurrentCount
implicit def system = context.system
def receive = LoggingReceive(this) {
case CurrentCount(key, count) ⇒
log.info("Current count for [{}] is [{}]", key, count)
}
}))
30.seconds.sleep()
system.shutdown()
}
object Counter {
case class Increment(n: Int)
case object GetCurrentCount
case class CurrentCount(key: String, count: Long)
case object Reconnect
}
class Counter extends Actor {
import Counter._
import Storage._
var count = 0L
var storage: Option[ActorRef] = None
def key = context.self.path.name
implicit def system = context.system
override def preStart() {
initStorage()
storage foreach { s ⇒
implicit val timeout = Timeout(5 seconds)
Await.result(s ? Get(key), timeout.duration) match {
case Entry(k, v) if k == key ⇒ count = v
}
}
}
def initStorage() {
// FIXME withinTimeRange should be duration
val strategy = OneForOneStrategy({
case e: StorageException ⇒ println("## Got it in Counter" + e); Restart
}: Decider, maxNrOfRetries = Some(3), withinTimeRange = Some(5000))
storage = Some(context.watch(context.actorOf(
Props[Storage].withFaultHandler(strategy), name = "storage")))
}
def storeCount() {
// Delegate dangerous work to child
storage foreach { _ ! Store(Entry(key, count)) }
}
def receive = LoggingReceive(this) {
case Increment(n) ⇒
count += n
storeCount()
case GetCurrentCount ⇒
sender ! CurrentCount(key, count)
case Terminated(actorRef) if Some(actorRef) == storage ⇒
storage = None
context.system.scheduler.scheduleOnce(10 seconds, self, Reconnect)
case Reconnect ⇒
initStorage()
storeCount()
}
}
object Storage {
case class Store(entry: Entry)
case class Get(key: String)
case class Entry(key: String, value: Long)
class StorageException(msg: String) extends RuntimeException(msg)
}
class Storage extends Actor {
import Storage._
val db = DummyDB
implicit def system = context.system
def receive = LoggingReceive(this) {
case Store(Entry(key, count)) ⇒ db.save(key, count)
case Get(key) ⇒ sender ! Entry(key, db.load(key).getOrElse(0L))
}
}
object DummyDB {
import Storage.StorageException
var db = Map[String, Long]()
@throws(classOf[StorageException])
def save(key: String, value: Long): Unit = synchronized {
if (11 <= value && value <= 14) { println("failure"); throw new StorageException("Simulated store failure " + value) }
db += (key -> value)
}
@throws(classOf[StorageException])
def load(key: String): Option[Long] = synchronized {
db.get(key)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment