Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
CQRS and EventSourcing using Akka Actors
import scala.collection.mutable.ListBuffer
import akka.actor.{Actor,ActorRef}
import akka.actor.Actor._
import akka.routing.{ Listeners, Listen }
//Represents a domain event
trait Event
//A builder to create domain entities
trait EntityBuilder[Entity] {
def build: Entity
}
//An EventStorage loads and stores events
trait EventStorage {
protected def saveEvents(events: Event*): Unit
protected def loadEvents(): Seq[Event]
}
//An EventSource needs to be mixed in together with an EventStorage,
//its responsibility is to facilitate construction of an entity given its history of events,
//and also to apply new events to the entity
trait EventSource[Entity,Builder <: EntityBuilder[Entity]] { self: EventStorage =>
private var cached: Tuple2[Builder, Entity] = cache(newBuilder) //What we cache here is an instance of the entity builder and what the entity currently looks like
protected def newBuilder: Builder //Has to be implemented, returns a newly constructed empty builder
protected def applyEvent(builder: Builder, event: Event): Builder //Has to be implemented, applies the supplied event to the supplied builder, yielding a new builder
//A unit of work is demarcation for atomically creating zero or more events that should be applied to the entity
//Add all Events created in the unit of work to the supplied ListBuffer, those events will then be applied to the entity and returned as the result of the unit of work
def unitOfWork(work: ListBuffer[Event] => Unit): Seq[Event] = {
val buffer = new ListBuffer[Event]
work(buffer)
applyEvents(buffer:_*)
buffer
}
//Returns the current instance of the eventsourced entity
def entity = cached._2
//replay loads and applies all events for the entity and caches the results
def replay(): Unit =
cached = cache(load())
protected def cache(builder: Builder): (Builder, Entity) = (builder, builder.build)
//Loads all events and applies them to a new builder and returns the resulting builder
protected def load(): Builder = loadEvents().foldLeft(newBuilder)(applyEvent)
//First saves the events and then applies the events towards the cached builder
protected def applyEvents(events: Event*) {
saveEvents(events:_*)
cached = cache(events.foldLeft(cached._1)(applyEvent _))
}
}
trait InMemoryEventStorage extends EventStorage {
//All events in the order of occurence
private var eventHistory: List[Event] = Nil
/**
* Saves the events in the order they occurred
*/
protected def saveEvents(events: Event*) {
eventHistory = eventHistory ::: events.toList
}
/**
* Loads the events in the order they occurred
*/
protected def loadEvents(): Seq[Event] = {
eventHistory
}
}
/**
* EventStreamActor receives events and publishes it to it's listeners
*/
class EventStreamActor(id: String) extends Actor with Listeners {
self.id = id
def receive = listenerManagement orElse {
case e: Event => gossip(e)
}
}
/**
* Represents an Actor that handles Commands and publishes Events while representing an Entity of type E with a corresponding EntityBuilder of type B
*/
trait ActorEventSourcing[E,B <: EntityBuilder[E]] extends Actor with EventSource[E, B] { self: EventStorage =>
def receive = commandHandling
//Publishes the sequence of Events to the listeners
def publish(events: Seq[Event]) { for(e <- events) eventStream ! e }
//Defines where events are published
def eventStream: ActorRef
//Defines the command handlers
def commandHandling: Receive
}
/**
*
* Alright, enough of handwaiving, here's some sample code
*
**/
//Represents a command to change a customer name
case class ChangeCustomerNameCommand(customerId: Long, name: String)
//Represents the event that a customer name was changed
case class ChangedName(customerId: Long, name: String) extends Event
//Customer is an entity object (immutable)
case class Customer(customerId: Long, age: Int, name: String)
//A builder used to create Customers from a sequence of Events
case class CustomerBuilder(customerId: Long, age: Int, name: String) extends EntityBuilder[Customer] {
def build = Customer(customerId, age, name)
}
object CustomerAR {
val eventStream = actorOf(new EventStreamActor("eventstream:customerAR")).start
}
//An Actor that represents a Customer, which is an Actor that processed commands and generates events that is applied to it's Customer which is stored in memory
class CustomerAR(val customerId: Long) extends ActorEventSourcing[Customer,CustomerBuilder] with InMemoryEventStorage {
def newBuilder = CustomerBuilder(customerId, 0, "")
val eventStream = CustomerAR.eventStream
//This is how events are applied to a CustomerBuilder, it's used to progress state and enable replays of events
def applyEvent(builder: CustomerBuilder, event: Event) : CustomerBuilder = event match {
case e@ChangedName(`customerId`, newName) => log.slf4j.debug("applyingEvent: {}",e); builder.copy(name = newName)
}
//This method defines how it reacts to Commands
def commandHandling = {
case ChangeCustomerNameCommand(`customerId`, newName) => //When it gets a name change command targeted at the corred customer id
publish( //Publish the resulting events
unitOfWork { //of the following unit of work
events => {
//If you need to check something on the instance of the entity you can call "entity"
//if (entity.name == "Chuck Norris") ...
events += ChangedName(customerId, newName) //Create the name change event
}
}
)
}
}
//Test code:
val debugListener = actorOf( new Actor { def receive = { case event => log.slf4j.debug("busListener received event: {}", event) } } ).start
CustomerAR.eventStream ! Listen(debugListener)
val customer = actorOf( new CustomerAR(55) ).start
customer ! ChangeCustomerNameCommand(55, "Jesus")
@mbbx6spp

This comment has been minimized.

Copy link

mbbx6spp commented Jan 30, 2012

Can you remember which version of Akka you used for this? Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.