Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import akka.actor._
import akka.persistence._
object EventsourcingExample extends App {
class EventsourcedProcessor extends Processor {
var events: List[Any] = Nil
var awaitStack: List[Actor.Receive] = Nil
def receive = {
case Persistent(event, _) if recoveryRunning update(event)
case cmd {
handle(s"${cmd}-1")(handler)
handle(s"${cmd}-2")(handler)
}
}
def update(event: Any) {
events = event :: events
}
val handler: Actor.Receive = {
case event {
println(s"applying ${event} to current state = ${events.reverse}")
update(event)
}
}
def handle[A](event: A)(handler: A Unit): Unit = {
val awaitingPersistence: Receive = {
case Persistent(`event`, sequenceNr)
context.unbecome()
awaitStack = awaitStack.tail
unstashAll()
handler(event)
case _ stash()
}
// revert awaitingPersistence behaviours on the actor
// behavior stack to process events in correct order
awaitStack.foreach(_ context.unbecome())
awaitStack = awaitingPersistence :: awaitStack
awaitStack.foreach(context.become(_, discardOld = false))
self forward Persistent(event)
}
}
val system = ActorSystem("example")
val processor = system.actorOf(Props[EventsourcedProcessor])
processor ! "foo"
processor ! "bar"
Thread.sleep(1000)
system.shutdown()
}
/**
* Support for eventsourced processors. Will become
* part of akka-persistence ...
*/
trait EventsourcedProcessor extends Processor {
private var awaitStack: List[Actor.Receive] = Nil
final def persist[A](event: A)(handler: A Unit): Unit = {
val awaitingPersistence: Receive = {
case PersistenceFailure(`event`, _, cause) {
throw cause // TODO: refine
}
case Persistent(`event`, _)
context.unbecome()
awaitStack = awaitStack.tail
unstashAll()
handler(event)
case _ stash()
}
// Revert awaitingPersistence ordering on
// the processor actor's behavior stack.
// Needed to handle persisted events in
// correct order.
awaitStack.foreach(_ context.unbecome())
awaitStack = awaitingPersistence :: awaitStack
awaitStack.foreach(context.become(_, discardOld = false))
self forward Persistent(event)
}
final def receive = {
case Persistent(payload, _) if recoveryRunning recoveryBehavior(payload)
case msg defaultBehavior(msg)
}
def recoveryBehavior: Actor.Receive
def defaultBehavior: Actor.Receive
}
// ------------------------------------------------------------------
// Usage examples
// ------------------------------------------------------------------
case class Cmd(data: Any)
case class Evt(data: Any)
/**
* Base trait of all example processors
*/
trait ExampleProcessor extends EventsourcedProcessor {
// state is the list of all handled events
var events: List[Any] = Nil
// update state by handling replayed events
val updateSate: Actor.Receive = {
case Evt(data) events = data :: events
}
// executed during recovery
def recoveryBehavior = updateSate
}
/**
* Receive command, then persist and handle event.
*/
class ExampleProcessor1 extends ExampleProcessor {
val defaultBehavior: Actor.Receive = {
// Triggered by (transient) cmd
//
// - create Evt("foo-1")
// - persist event (asynchronously)
// - handle event after successful persistence
// (may close over processor state)
//
// It is guaranteed that no other messages are
// processed in between calling persist and
// execution of the event handler (= updateState)
//
case Cmd("foo") persist(Evt("foo-1"))(updateSate)
}
}
/**
* Notify listeners after successful persistence and handling of event.
*/
class ExampleProcessor2 extends ExampleProcessor {
val defaultBehavior: Actor.Receive = {
case Cmd("foo") persist(Evt("foo-1")) { event
updateSate(event)
// Event handler not executed during recovery,
// so we can notify listeners here
context.system.eventStream publish event
}
}
}
/**
* Persist and handle more than one event per command.
*/
class ExampleProcessor3 extends ExampleProcessor {
val defaultBehavior: Actor.Receive = {
case Cmd("foo") {
persist(Evt("foo-1"))(updateSate)
persist(Evt("foo-2"))(updateSate)
}
}
}
/**
* Change eventsourced processor behavior in event handler.
*/
class ExampleProcessor4 extends ExampleProcessor {
val defaultBehavior: Actor.Receive = {
case Cmd("foo") {
persist(Evt("foo-1"))(updateSate)
persist(Evt("foo-2")) { event
updateSate(event)
context.become(barBehavior)
}
}
}
val barBehavior: Actor.Receive = {
case Cmd("bar") {
persist(Evt("bar-1")) { event
updateSate(event)
// switch to defaultBehavior
context.unbecome()
}
}
}
}
/**
* Mix event sourcing and command sourcing
* (possible but not recommended though ...)
*/
class ExampleProcessor5 extends ExampleProcessor {
val defaultBehavior: Actor.Receive = {
// transient command -> persistent event
case Cmd("foo") persist(Evt("foo-1"))(updateSate)
// persistent command
case Persistent(cmd: Cmd, _) recoveryBehavior(cmd)
}
// requires overriding recoveryBehavior to handle replayed commands
override def recoveryBehavior = updateSate orElse {
case Cmd(data) // ...
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment