import akka.persistence._
object EventsourcingExample extends App {
class EventsourcedProcessor extends Processor {
var events: List[Any] = Nil
var awaitStack: List[Actor.Receive] = Nil
def receive = {
case Persistent(event, _) if recoveryRunning ⇒ update(event)
case cmd ⇒ {
def update(event: Any) {
events = event :: events
val handler: Actor.Receive = {
case event ⇒ {
println(s"applying ${event} to current state = ${events.reverse}")
def handle[A](event: A)(handler: A ⇒ Unit): Unit = {
val awaitingPersistence: Receive = {
case Persistent(`event`, sequenceNr) ⇒
awaitStack = awaitStack.tail
case _ ⇒ stash()
// revert awaitingPersistence behaviours on the actor
// behavior stack to process events in correct order
awaitStack.foreach(_ ⇒ context.unbecome())
awaitStack = awaitingPersistence :: awaitStack
awaitStack.foreach(context.become(_, discardOld = false))
self forward Persistent(event)
val system = ActorSystem("example")
val processor = system.actorOf(Props[EventsourcedProcessor])
processor ! "foo"
processor ! "bar"
* Support for eventsourced processors.
trait EventsourcedProcessor extends Processor {
private var awaitStack: List[Actor.Receive] = Nil
final def persist[A](event: A)(handler: A ⇒ Unit): Unit = {
val awaitingPersistence: Receive = {
case PersistenceFailure(`event`, _, cause) ⇒ {
throw cause // TODO: refine
case Persistent(`event`, _) ⇒
awaitStack = awaitStack.tail
case _ ⇒ stash()
// Revert awaitingPersistence ordering on
// the processor actor's behavior stack.
// Needed to handle persisted events in
// correct order.
awaitStack.foreach(_ ⇒ context.unbecome())
awaitStack = awaitingPersistence :: awaitStack
awaitStack.foreach(context.become(_, discardOld = false))
self forward Persistent(event)
final def receive = {
case Persistent(payload, _) if recoveryRunning ⇒ recoveryBehavior(payload)
case msg ⇒ defaultBehavior(msg)
def recoveryBehavior: Actor.Receive
def defaultBehavior: Actor.Receive
// ------------------------------------------------------------------
// Usage examples
// ------------------------------------------------------------------
case class Cmd(data: Any)
case class Evt(data: Any)
* Base trait of all example processors
trait ExampleProcessor extends EventsourcedProcessor {
// state is the list of all handled events
var events: List[Any] = Nil
// update state by handling replayed events
val updateSate: Actor.Receive = {
case Evt(data) ⇒ events = data :: events
// executed during recovery
def recoveryBehavior = updateSate
* Receive command, then persist and handle event.
class ExampleProcessor1 extends ExampleProcessor {
val defaultBehavior: Actor.Receive = {
// Triggered by (transient) cmd
// - create Evt("foo-1")
// - persist event (asynchronously)
// - handle event after successful persistence
// (may close over processor state)
// It is guaranteed that no other messages are
// processed in between calling persist and
// execution of the event handler (= updateState)
case Cmd("foo") ⇒ persist(Evt("foo-1"))(updateSate)
* Notify listeners after successful persistence and handling of event.
class ExampleProcessor2 extends ExampleProcessor {
val defaultBehavior: Actor.Receive = {
case Cmd("foo") ⇒ persist(Evt("foo-1")) { event ⇒
// Event handler not executed during recovery,
// so we can notify listeners here
context.system.eventStream publish event
* Persist and handle more than one event per command.
class ExampleProcessor3 extends ExampleProcessor {
val defaultBehavior: Actor.Receive = {
case Cmd("foo") ⇒ {
* Change eventsourced processor behavior in event handler.
class ExampleProcessor4 extends ExampleProcessor {
val defaultBehavior: Actor.Receive = {
case Cmd("foo") ⇒ {
persist(Evt("foo-2")) { event ⇒
val barBehavior: Actor.Receive = {
case Cmd("bar") ⇒ {
persist(Evt("bar-1")) { event ⇒
// switch to defaultBehavior
* Mix event sourcing and command sourcing
* (possible but not recommended though ...)
class ExampleProcessor5 extends ExampleProcessor {
val defaultBehavior: Actor.Receive = {
// transient command -> persistent event
case Cmd("foo") ⇒ persist(Evt("foo-1"))(updateSate)
// persistent command
case Persistent(cmd: Cmd, _) ⇒ recoveryBehavior(cmd)
// requires overriding recoveryBehavior to handle replayed commands
override def recoveryBehavior = updateSate orElse {
case Cmd(data) ⇒ // ...
