Skip to content

Instantly share code, notes, and snippets.

@gseitz
Created August 23, 2012 21:12
Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gseitz/3441818 to your computer and use it in GitHub Desktop.
Save gseitz/3441818 to your computer and use it in GitHub Desktop.
Akka FSM Supervision
package akka.actor
import collection.mutable
import akka.actor.FSM.{ CurrentState, Transition, UnsubscribeTransitionCallBack, SubscribeTransitionCallBack }
import akka.routing.{ Deafen, Listen }
case object PreRestart
case object PostRestart
trait ParentNotification { thisActor: Actor ⇒
override def preRestart(reason: Throwable, message: Option[Any]) {
context.parent ! PreRestart
}
override def postRestart(reason: Throwable) {
context.parent ! PostRestart
}
}
class FSMSupervisor(targetProps: Props) extends Actor {
private val transitionCallbacks = mutable.Set[ActorRef]()
private var currentState: Option[Any] = None
private val target = context.actorOf(targetProps)
override def preStart() {
target ! SubscribeTransitionCallBack(self)
}
def receive = {
case SubscribeTransitionCallBack(listener) ⇒ subscribe(listener)
case Listen(listener) ⇒ subscribe(listener)
case UnsubscribeTransitionCallBack(listener) ⇒ unsubscribe(listener)
case Deafen(listener) ⇒ unsubscribe(listener)
case Transition(`target`, from, to) ⇒
currentState = Some(to)
notifyListeners(Transition(self, from, to))
case CurrentState(`target`, state) ⇒
val toPublish = currentState.map(c ⇒ Transition(self, c, state)).getOrElse(CurrentState(self, state))
currentState = Some(state)
notifyListeners(toPublish)
case PreRestart ⇒ target ! SubscribeTransitionCallBack(self)
case PostRestart ⇒
case msg ⇒ target forward msg
}
private def subscribe(listener: ActorRef) {
transitionCallbacks += listener
currentState.foreach(st ⇒ listener ! CurrentState(self, st))
}
private def unsubscribe(listener: ActorRef) {
transitionCallbacks -= listener
}
private def notifyListeners(msg: AnyRef) {
transitionCallbacks foreach {
case cb ⇒
if (cb.isTerminated) transitionCallbacks -= cb
else cb ! msg
}
}
}
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
import language.postfixOps
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
import akka.testkit._
import TestEvent.Mute
import scala.concurrent.util.duration._
import akka.event._
import com.typesafe.config.ConfigFactory
import scala.concurrent.Await
import akka.util.Timeout
import scala.concurrent.util.Duration
object FSMSupervisionSpec {
val timeout = Timeout(2 seconds)
class Latches(implicit system: ActorSystem) {
val unlockedLatch = TestLatch()
val lockedLatch = TestLatch()
val unhandledLatch = TestLatch()
val terminatedLatch = TestLatch()
val transitionLatch = TestLatch()
val initialStateLatch = TestLatch()
val transitionCallBackLatch = TestLatch()
}
sealed trait LockState
case object Locked extends LockState
case object Open extends LockState
class Lock(code: String, timeout: Duration, latches: Latches) extends Actor with FSM[LockState, CodeState] with ParentNotification {
import latches._
startWith(Locked, CodeState("", code))
when(Locked) {
case Event(digit: Char, CodeState(soFar, code)) ⇒ {
soFar + digit match {
case incomplete if incomplete.length < code.length ⇒
stay using CodeState(incomplete, code)
case codeTry if (codeTry == code) ⇒ {
doUnlock
goto(Open) using CodeState("", code) forMax timeout
}
case wrong ⇒ {
stay using CodeState("", code)
}
}
}
case Event("hello", _) ⇒ stay replying "world"
case Event("bye", _) ⇒ stop(FSM.Shutdown)
}
when(Open) {
case Event(StateTimeout, _) ⇒ {
doLock
goto(Locked)
}
}
whenUnhandled {
case Event(msg, _) ⇒ {
log.warning("unhandled event " + msg + " in state " + stateName + " with data " + stateData)
unhandledLatch.open
stay
}
}
onTransition {
case Locked -> Open ⇒ transitionLatch.open
}
// verify that old-style does still compile
onTransition(transitionHandler _)
def transitionHandler(from: LockState, to: LockState) = {
// dummy
}
onTermination {
case StopEvent(FSM.Shutdown, Locked, _) ⇒
// stop is called from lockstate with shutdown as reason...
terminatedLatch.open
}
// initialize the lock
initialize
private def doLock() {
lockedLatch.open
}
private def doUnlock = {
unlockedLatch.open
}
}
case class CodeState(soFar: String, code: String)
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class FSMSupervisionSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with ImplicitSender {
import FSMSupervisionSpec._
"An FSM Actor Supervisor" must {
"let the FSM Actor unlock the lock" in {
import FSM.{ Transition, CurrentState, SubscribeTransitionCallBack }
val latches = new Latches
import latches._
// lock that locked after being open for 1 sec
lazy val target = Props(new Lock("33221", 1 second, latches))
val lock = system.actorOf(Props(new FSMSupervisor(target)))
val transitionTester = system.actorOf(Props(new Actor {
def receive = {
case Transition(_, _, _) ⇒ transitionCallBackLatch.open
case CurrentState(_, Locked) ⇒ initialStateLatch.open
}
}))
lock ! SubscribeTransitionCallBack(transitionTester)
Await.ready(initialStateLatch, timeout.duration)
lock ! '3'
lock ! '3'
lock ! '2'
lock ! '2'
lock ! '1'
Await.ready(unlockedLatch, timeout.duration)
Await.ready(transitionLatch, timeout.duration)
Await.ready(transitionCallBackLatch, timeout.duration)
Await.ready(lockedLatch, timeout.duration)
EventFilter.warning(start = "unhandled event", occurrences = 1) intercept {
lock ! "not_handled"
Await.ready(unhandledLatch, timeout.duration)
}
val answerLatch = TestLatch()
object Hello
object Bye
val tester = system.actorOf(Props(new Actor {
def receive = {
case Hello ⇒ lock ! "hello"
case "world" ⇒ answerLatch.open
case Bye ⇒ lock ! "bye"
}
}))
tester ! Hello
Await.ready(answerLatch, timeout.duration)
tester ! Bye
Await.ready(terminatedLatch, timeout.duration)
}
"run onTermination upon ActorRef.stop() on the supervisor" in {
val started = TestLatch(1)
lazy val fsm = new Actor with FSM[Int, Null] with ParentNotification {
override def preStart = { started.countDown }
startWith(1, null)
when(1) { FSM.NullFunction }
onTermination {
case x ⇒ testActor ! x
}
}
val supervisor = system.actorOf(Props(new FSMSupervisor(Props(fsm))))
Await.ready(started, timeout.duration)
system.stop(supervisor)
expectMsg(1 second, fsm.StopEvent(FSM.Shutdown, 1, null))
}
"must keep subscriptions in case of a restart of the FSM actor" in {
import FSM._
val started = TestLatch(1)
val supervisor = system.actorOf(Props(new FSMSupervisor(Props(new Actor with FSM[Int, Null] with ParentNotification {
override def preStart = { started.countDown }
startWith(1, null)
when(1) { case Event(2, _) ⇒ goto(2) }
when(2) { case Event(_, _) ⇒ sys.error("Not accepting any messages") }
}))))
Await.ready(started, timeout.duration)
supervisor ! SubscribeTransitionCallBack(testActor)
supervisor ! 2
supervisor ! 3
expectMsg(1 second, CurrentState(supervisor, 1))
expectMsg(1 second, Transition(supervisor, 1, 2))
expectMsg(1 second, Transition(supervisor, 2, 1))
}
}
}
kka > akka-actor-tests/test:test-only akka.actor.FSMSupervisionSpec
[info] FSMSupervisionSpec:
[info] An FSM Actor Supervisor
[info] - must let the FSM Actor unlock the lock (1 second, 178 milliseconds)
[info] - must run onTermination upon ActorRef.stop() on the supervisor (13 milliseconds)
[ERROR] [08/23/2012 23:11:15.918] [FSMSupervisionSpec-akka.actor.default-dispatcher-3] [akka://FSMSupervisionSpec/user/$e/$a] Not accepting any messages
java.lang.RuntimeException: Not accepting any messages
at scala.sys.package$.error(package.scala:27)
at akka.actor.FSMSupervisionSpec$$anonfun$4$$anonfun$apply$mcV$sp$3$$anonfun$15$$anonfun$apply$2$$anon$2$$anonfun$9.applyOrElse(FSMSupervisionSpec.scala:190)
at akka.actor.FSMSupervisionSpec$$anonfun$4$$anonfun$apply$mcV$sp$3$$anonfun$15$$anonfun$apply$2$$anon$2$$anonfun$9.applyOrElse(FSMSupervisionSpec.scala:190)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:37)
at akka.actor.FSM$class.processEvent(FSM.scala:568)
at akka.actor.FSMSupervisionSpec$$anonfun$4$$anonfun$apply$mcV$sp$3$$anonfun$15$$anonfun$apply$2$$anon$2.processEvent(FSMSupervisionSpec.scala:186)
at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:562)
at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:556)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:388)
at akka.actor.ActorCell.invoke(ActorCell.scala:364)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:230)
at akka.dispatch.Mailbox.run(Mailbox.scala:212)
at akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:516)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:262)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:975)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1478)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)
[info] - must must keep subscriptions in case of a restart of the FSM actor (10 milliseconds)
[info] Passed: : Total 3, Failed 0, Errors 0, Passed 3, Skipped 0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment