Skip to content

Instantly share code, notes, and snippets.

@krasserm
Last active December 17, 2015 23:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save krasserm/5692790 to your computer and use it in GitHub Desktop.
Save krasserm/5692790 to your computer and use it in GitHub Desktop.
Eventsourced processors in a parent-child relationship
import java.io.File
import scala.concurrent.duration._
import akka.actor._
import akka.util.Timeout
import org.eligosource.eventsourced.core._
import org.eligosource.eventsourced.journal.leveldb.LeveldbJournalProps
class Parent extends Actor { this: Receiver with Eventsourced =>
implicit val recoveryTimeout = Timeout(10 seconds)
import context.dispatcher
var received: List[Int] = Nil
var child: Option[ActorRef] = None
def receive = {
case n: Int => {
if (n > 10) {
if (child.isEmpty) { child = Some(createChildProcessor(2, 2)) }
child.foreach(_ ! message /* current event message containing n */)
} else {
received = n :: received
println("parent received so far: " + received.reverse)
}
}
}
def createChildProcessor(pid: Int, cid: Int): ActorRef = {
val childActor = extension.processorOf(Props(new Child with Receiver with Confirm with Eventsourced { val id = pid } ))
val childChannel = extension.channelOf(DefaultChannelProps(cid, childActor))
for { // asynchronous, non-blocking recovery
_ <- extension.replay(Seq(ReplayParams(pid)))
_ <- extension.deliver(cid)
} yield ()
childChannel
}
}
class Child extends Actor { this: Receiver =>
var received: List[Int] = Nil
def receive = {
case n: Int => {
received = n :: received
println("child received so far: " + received.reverse)
}
}
}
object Example extends App {
implicit val system = ActorSystem("example")
val journal: ActorRef = Journal(LeveldbJournalProps(new File("target/example")))
val extension = EventsourcingExtension(system, journal)
val parent: ActorRef = extension.processorOf(Props(new Parent with Receiver with Eventsourced { val id = 1 } ))
extension.recover(Seq(ReplayParams(1)))
parent ! Message(6)
parent ! Message(14)
}
import java.io.File
import scala.concurrent.duration._
import akka.actor._
import akka.util.Timeout
import org.eligosource.eventsourced.core._
import org.eligosource.eventsourced.journal.leveldb.LeveldbJournalProps
class Parent extends Actor { this: Receiver with Eventsourced =>
implicit val recoveryTimeout = Timeout(10 seconds)
import context.dispatcher
var received: List[Int] = Nil
var child: Option[ActorRef] = None
def receive = {
case n: Int => {
if (n > 10) {
if (child.isEmpty) { child = Some(createChildProcessor(2, 2)) }
child.foreach(_ ! message.copy((n, randomPin)))
} else {
received = n :: received
println("parent received so far: " + received.reverse)
}
}
}
def createChildProcessor(pid: Int, cid: Int): ActorRef = {
val childActor = extension.processorOf(Props(new Child with Receiver with Confirm with Eventsourced { val id = pid } ))
val childChannel = extension.channelOf(DefaultChannelProps(cid, childActor))
for { // asynchronous, non-blocking recovery
_ <- extension.replay(Seq(ReplayParams(pid)))
_ <- extension.deliver(cid)
} yield ()
childChannel
}
def randomPin: String =
System.currentTimeMillis.toString.takeRight(4)
}
class Child extends Actor { this: Receiver =>
var received: List[Int] = Nil
def receive = {
case (n: Int, pin: String) => {
received = n :: received
println(s"child received so far: ${received.reverse}, sending out random pin: ${pin}")
}
}
}
object Example extends App {
implicit val system = ActorSystem("example")
val journal: ActorRef = Journal(LeveldbJournalProps(new File("target/example")))
val extension = EventsourcingExtension(system, journal)
val parent: ActorRef = extension.processorOf(Props(new Parent with Receiver with Eventsourced { val id = 1 } ))
extension.recover(Seq(ReplayParams(1)))
parent ! Message(6)
parent ! Message(14)
}
import java.io.File
import scala.concurrent.duration._
import akka.actor._
import akka.pattern.ask
import akka.util.Timeout
import org.eligosource.eventsourced.core._
import org.eligosource.eventsourced.journal.leveldb.LeveldbJournalProps
class Parent extends Actor { this: Receiver with Eventsourced =>
implicit val timeout = Timeout(2 seconds)
import context.dispatcher
var received: List[Int] = Nil
var child: Option[ActorRef] = None
def receive = {
case event => {
if (child.isEmpty) { child = Some(createDoorFSM(2, 2)) }
child.foreach { _ ? message onComplete { case r => println(s"response = ${r}") } }
}
}
def createDoorFSM(pid: Int, cid: Int): ActorRef = {
val childActor = extension.processorOf(Props(new Door with Receiver with Confirm with Eventsourced { val id = pid } ))
val childChannel = extension.channelOf(DefaultChannelProps(cid, childActor))
for { // asynchronous, non-blocking recovery
_ <- extension.replay(Seq(ReplayParams(pid)))
_ <- extension.deliver(cid)
} yield ()
childChannel
}
}
class Door extends Actor with FSM[String, Int] { this: Receiver =>
val OPENED = "opened"
val CLOSED = "closed"
startWith(CLOSED, 0)
when(CLOSED) {
case Event("open", counter) => {
goto(OPENED) using(counter + 1) replying("door opened")
}
}
when(OPENED) {
case Event("close", counter) => {
goto(CLOSED) using(counter + 1) replying("door closed")
}
}
}
object Example extends App {
implicit val timeout = Timeout(10 seconds)
implicit val system = ActorSystem("example")
import system.dispatcher
val journal: ActorRef = Journal(LeveldbJournalProps(new File("target/example")))
val extension = EventsourcingExtension(system, journal)
val parent: ActorRef = extension.processorOf(Props(new Parent with Receiver with Eventsourced { val id = 1 } ))
extension.recover(Seq(ReplayParams(1)))
parent ! Message("open")
parent ! Message("close")
}
import java.io.File
import scala.concurrent.duration._
import akka.actor._
import akka.pattern.ask
import akka.util.Timeout
import org.eligosource.eventsourced.core._
import org.eligosource.eventsourced.journal.leveldb.LeveldbJournalProps
class Parent extends Actor { this: Receiver with Eventsourced =>
implicit val timeout = Timeout(2 seconds)
import context.dispatcher
var received: List[Int] = Nil
var child: Option[ActorRef] = None
val childProcessorId = 2
val childChannelId = 2
def receive = {
case event => {
if (child.isEmpty) { child = Some(createDoorFSM(childProcessorId, childChannelId)) }
child.foreach { c =>
// Only ask (?) channel if current message is not a confirmed message
// because confirmed messages will be dropped by the channel and no
// reply will be made.
if (!message.acks.contains(childChannelId)) c ? message onComplete { case r => println(s"response = ${r}") }
}
}
}
def createDoorFSM(pid: Int, cid: Int): ActorRef = {
val childActor = extension.processorOf(Props(new Door with Receiver with Confirm with Eventsourced { val id = pid } ))
val childChannel = extension.channelOf(DefaultChannelProps(cid, childActor))
for { // asynchronous, non-blocking recovery
_ <- extension.replay(Seq(ReplayParams(pid)))
_ <- extension.deliver(cid)
} yield ()
childChannel
}
}
class Door extends Actor with FSM[String, Int] { this: Receiver =>
val OPENED = "opened"
val CLOSED = "closed"
startWith(CLOSED, 0)
when(CLOSED) {
case Event("open", counter) => {
goto(OPENED) using(counter + 1) replying("door opened")
}
}
when(OPENED) {
case Event("close", counter) => {
goto(CLOSED) using(counter + 1) replying("door closed")
}
}
}
object Example extends App {
implicit val timeout = Timeout(10 seconds)
implicit val system = ActorSystem("example")
import system.dispatcher
val journal: ActorRef = Journal(LeveldbJournalProps(new File("target/example")))
val extension = EventsourcingExtension(system, journal)
val parent: ActorRef = extension.processorOf(Props(new Parent with Receiver with Eventsourced { val id = 1 } ))
extension.recover(Seq(ReplayParams(1)))
parent ! Message("open")
parent ! Message("close")
}
import java.io.File
import scala.concurrent.duration._
import scala.concurrent.Future
import scala.util.{Success, Failure}
import akka.actor._
import akka.pattern.{ask, pipe}
import akka.util.Timeout
import org.eligosource.eventsourced.core._
import org.eligosource.eventsourced.journal.leveldb.LeveldbJournalProps
case class Initialized(child: ActorRef)
class Parent extends Actor {
import context.dispatcher
val extn = EventsourcingExtension(context.system)
val pid = 1
var child: ActorRef = _
// buffer for messages received during
// asynchronous recovery of child actor
var buffer: List[(Any, ActorRef)] = Nil
def receive = {
case msg => {
recoverDoorFSM(pid) onComplete {
case Success(c) => self ! Initialized(c)
case Failure(_) => // handle recovery failure ...
}
context.become(initializing) // or become(initializing) if this actor is Eventsourced
buffer = (msg, sender) :: buffer
}
}
val initializing: Receive = {
case Initialized(c) => {
context.become(active) // or become(active) if this actor is Eventsourced
child = c
buffer.reverse.foreach { case (msg, sdr) => child tell (Message(msg), sdr) }
buffer = Nil
}
case msg => buffer = (msg, sender) :: buffer
}
val active: Receive = {
case msg => child forward Message(msg)
}
def recoverDoorFSM(pid: Int): Future[ActorRef] = {
implicit val timeout = Timeout(10 seconds)
val childActor = extn.processorOf(ProcessorProps(pid, pid => new Door with Receiver with Eventsourced { val id = pid } ))
extn.replay(Seq(ReplayParams(pid))).map(_ => childActor) // asynchronous, non-blocking recovery (no channel activation needed here)
}
}
class Door extends Actor with FSM[String, Int] { this: Receiver with Eventsourced =>
implicit val timeout = Timeout(3 seconds)
import context.dispatcher
val AWAITING = "awaiting"
val OPENED = "opened"
val CLOSED = "closed"
val extn = EventsourcingExtension(context.system)
var child: ActorRef = _
startWith(CLOSED, 0)
when(CLOSED) {
case Event("open", counter) => {
val replyTo = sender
if (!message.acks.contains(id)) { // avoid sending confirmed message to channel
val ftr = for { result <- child ? message.copy("open") } yield Message((result, replyTo))
pipe(ftr) to self
}
goto(AWAITING)
}
}
when(OPENED) {
case Event("close", counter) => {
val replyTo = sender
if (!message.acks.contains(id)) { // avoid sending confirmed message to channel
val ftr = for { result <- child ? message.copy("close") } yield Message((result, replyTo))
pipe(ftr) to self
}
goto(AWAITING)
}
}
when(AWAITING) {
case Event(("open-result", replyTo: ActorRef), counter) => {
replyTo ! "door opened"
goto(OPENED) using(counter + 1)
}
case Event(("close-result", replyTo: ActorRef), counter) => {
replyTo ! "door closed"
goto(CLOSED) using(counter + 1)
}
}
override def preStart() {
// create child actor wrapped by channel (channel id == processor id in this example)
child = extn.channelOf(DefaultChannelProps(id, context.actorOf(Props(new Worker with Receiver with Confirm))))
extn.deliver(id) // channel activation
}
}
class Worker extends Actor {
def receive = {
case event => sender ! s"${event}-result"
}
}
object Example extends App {
implicit val timeout = Timeout(10 seconds)
implicit val system = ActorSystem("example")
import system.dispatcher
val journal: ActorRef = LeveldbJournalProps(new File("target/example")).withNative(false).createJournal
val extension = EventsourcingExtension(system, journal)
val parent: ActorRef = system.actorOf(Props[Parent])
// Door FSM will likely reject close command because
// it is still in AWAITING state for an open-result
//parent ! "open"
//parent ! "close"
// ... therefore, replies from state transitions have
// to be awaited (non-blocking) by application using
// a monadic composition of futures
for {
a <- parent ? "open"
b <- parent ? "close"
} println(s"answers = ${a}, ${b}")
Thread.sleep(3000)
system.shutdown()
system.awaitTermination()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment