Skip to content

Instantly share code, notes, and snippets.

@krasserm
Created January 20, 2012 16:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save krasserm/1648358 to your computer and use it in GitHub Desktop.
Save krasserm/1648358 to your computer and use it in GitHub Desktop.
Building an event-sourced web application with Akka - Part 2
import akka.actor._
import akka.dispatch._
import akka.stm._
import scalaz._
trait Projection[S, A] {
def initialState: S
def currentState: S
def project: PartialFunction[(S, A), S]
}
trait UpdateProjection[S, A] extends Projection[S, A] {
private lazy val ref = Ref(initialState)
private lazy val updater = Actor.actorOf(new Updater).start
def eventLog: EventLog
def writeAhead: Boolean = true
def currentState: S = ref()
def transacted[B <: A](update: S => Update[Event, B]): Future[DomainValidation[B]] = {
val promise = new DefaultCompletableFuture[DomainValidation[B]]
def dispatch = updater ! ApplyUpdate(update, promise.asInstanceOf[CompletableFuture[DomainValidation[A]]])
if (Stm.activeTransaction) {
currentState // join
deferred(dispatch)
} else {
dispatch
}
promise
}
private case class ApplyUpdate(update: S => Update[Event, A], promise: CompletableFuture[DomainValidation[A]])
private class Updater extends Actor {
if (writeAhead) self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
def receive = {
case ApplyUpdate(u, p) => {
val current = currentState
val update = u(current)
update() match {
case (events, s @ Success(result)) => {
log(events.reverse) // TODO: handle errors
ref set project(current, result.asInstanceOf[A])
p.completeWithResult(s)
}
case (_, f) => {
p.completeWithResult(f)
}
}
}
}
def log(events: List[Event]) = events.foreach { event =>
if (writeAhead) eventLog.append(event) else eventLog.appendAsync(event)
}
}
}
import akka.dispatch._
trait InvoiceService extends UpdateProjection[Map[String, Invoice], Invoice] {
import InvoiceService._
def getInvoice(invoiceId: String): Option[Invoice] = currentState.get(invoiceId)
def getInvoices: Iterable[Invoice] = currentState.values
def updateInvoice[B <: Invoice](invoiceId: String)(f: Invoice => Update[InvoiceEvent, B]) = transacted { state =>
state.get(invoiceId) match {
case Some(invoice) => f(invoice)
case None => Update.reject(DomainError("invoice %s: does not exist" format invoiceId))
}
}
def updateDraftInvoice[B <: Invoice](invoiceId: String)(f: DraftInvoice => Update[InvoiceEvent, B]) =
updateInvoice(invoiceId) { invoice =>
invoice match {
case invoice: DraftInvoice => f(invoice)
case invoice: Invoice => Update.reject(DomainError("invoice %s: not a draft invoice" format invoiceId))
}
}
def addInvoiceItem(invoiceId: String, invoiceItem: InvoiceItem): Future[DomainValidation[DraftInvoice]] =
updateDraftInvoice(invoiceId) { invoice => invoice.addItem(invoiceItem) }
def sendInvoiceTo(invoiceId: String, to: InvoiceAddress): Future[DomainValidation[SentInvoice]] =
updateDraftInvoice(invoiceId) { invoice => invoice.sendTo(to) }
// ...
def project = {
case (state, updated) => state + (updated.id -> updated)
}
}
object InvoiceService {
def apply(log: EventLog, initial: Map[String, Invoice] = Map.empty) = new InvoiceService {
val eventLog = log
val initialState = initial
}
}
trait PersistentInvoice extends UpdateProjection[Invoice, Invoice] {
def updateDraft[B <: Invoice](f: DraftInvoice => Update[InvoiceEvent, B]) = transacted {
_ match {
case di: DraftInvoice => f(di)
case _ => Update.reject(DomainError("not a draft invoice"))
}
}
def addItem(item: InvoiceItem): Future[DomainValidation[DraftInvoice]] =
updateDraft(di => di.addItem(item))
def sendTo(address: InvoiceAddress): Future[DomainValidation[SentInvoice]] =
updateDraft(di => di.sendTo(address))
// ...
def project = {
case (_, invoice) => invoice
}
}
object PersistentInvoice {
def apply(log: EventLog, initial: Invoice) = new PersistentInvoice {
val eventLog = log
val initialState = initial
}
}
trait InvoiceService {
val invoices: Map[String, PersistentInvoice]
// ...
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment