public
Created

  • Download Gist
FSMExample.scala
Scala
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
package examples
 
import akka.actor._
import scala.concurrent.duration._
import scala.collection.immutable
 
// Master case classes
 
case object Start
 
 
 
// received events
case class SetTarget(ref: ActorRef)
case class Queue(obj: Any)
 
case object Flush
 
// sent events
case class Batch(obj: immutable.Seq[Any])
 
// states
sealed trait State
 
case object Idle extends State
case object Active extends State
 
sealed trait Data
 
case object Uninitialized extends Data
case class Todo(target: ActorRef, queue: immutable.Seq[Any]) extends Data
 
class Buncher extends Actor with FSM[State, Data] {
 
startWith(Idle, Uninitialized)
 
when(Idle) {
case Event(SetTarget(ref), Uninitialized) =>
stay using Todo(ref, Vector.empty)
}
 
// transition elided ...
when(Active, stateTimeout = 1 second) {
case Event(Flush | StateTimeout, t: Todo) =>
goto(Idle) using t.copy(queue = Vector.empty)
}
 
whenUnhandled {
 
// common code for both states
case Event(Queue(obj), t@Todo(_, v)) =>
goto(Active) using t.copy(queue = v :+ obj)
 
case Event("exceptionCase", _) => throw new RuntimeException
// case Event(e, s) =>
// log.warning("received unhandled request {} in state {}/{}", e, stateName, s)
// stay
}
 
onTransition {
case Active -> Idle =>
 
stateData match {
case Todo(ref, queue) => ref ! Batch (queue)
}
}
 
 
// unhandled elided ...
initialize()
 
 
override def preRestart(reason: Throwable, message: Option[Any]) {
super.preRestart(reason, message)
println(message, reason)
}
}
 
class ExampleActor extends Actor {
 
def receive = {
case x => println(x)
}
 
}
 
class MasterActor extends Actor {
 
 
override def supervisorStrategy: SupervisorStrategy = super.supervisorStrategy
 
def receive = {
 
case Start =>
 
/** defines child Actors */
val buncherAkkEx = context.actorOf(Props[Buncher])
buncherAkkEx ! SetTarget(self)
buncherAkkEx ! Queue(10)
buncherAkkEx ! Queue(11)
buncherAkkEx ! Queue(12)
 
case Batch(queue) =>
 
println(s"The queue is: $queue")
 
case m => context.children.head ! m
}
}
 
object FSMExample extends App {
 
val system = ActorSystem("FSMExample")
 
// val exampleAkk = system.actorOf(Props[ExampleActor])
// val buncherAkkEx = system.actorOf(Props[Buncher])
 
val masterAkk = system.actorOf(Props[MasterActor])
 
// buncherAkkEx ! SetTarget(exampleAkk)
// buncherAkkEx ! Flush
 
masterAkk ! Start
masterAkk ! "exceptionCase"
 
// Thread.sleep(2000)
// system.shutdown()
}

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.