Created
January 20, 2012 16:52
-
-
Save krasserm/1648358 to your computer and use it in GitHub Desktop.
Building an event-sourced web application with Akka - Part 2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor._ | |
import akka.dispatch._ | |
import akka.stm._ | |
import scalaz._ | |
trait Projection[S, A] { | |
def initialState: S | |
def currentState: S | |
def project: PartialFunction[(S, A), S] | |
} | |
trait UpdateProjection[S, A] extends Projection[S, A] { | |
private lazy val ref = Ref(initialState) | |
private lazy val updater = Actor.actorOf(new Updater).start | |
def eventLog: EventLog | |
def writeAhead: Boolean = true | |
def currentState: S = ref() | |
def transacted[B <: A](update: S => Update[Event, B]): Future[DomainValidation[B]] = { | |
val promise = new DefaultCompletableFuture[DomainValidation[B]] | |
def dispatch = updater ! ApplyUpdate(update, promise.asInstanceOf[CompletableFuture[DomainValidation[A]]]) | |
if (Stm.activeTransaction) { | |
currentState // join | |
deferred(dispatch) | |
} else { | |
dispatch | |
} | |
promise | |
} | |
private case class ApplyUpdate(update: S => Update[Event, A], promise: CompletableFuture[DomainValidation[A]]) | |
private class Updater extends Actor { | |
if (writeAhead) self.dispatcher = Dispatchers.newThreadBasedDispatcher(self) | |
def receive = { | |
case ApplyUpdate(u, p) => { | |
val current = currentState | |
val update = u(current) | |
update() match { | |
case (events, s @ Success(result)) => { | |
log(events.reverse) // TODO: handle errors | |
ref set project(current, result.asInstanceOf[A]) | |
p.completeWithResult(s) | |
} | |
case (_, f) => { | |
p.completeWithResult(f) | |
} | |
} | |
} | |
} | |
def log(events: List[Event]) = events.foreach { event => | |
if (writeAhead) eventLog.append(event) else eventLog.appendAsync(event) | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.dispatch._ | |
trait InvoiceService extends UpdateProjection[Map[String, Invoice], Invoice] { | |
import InvoiceService._ | |
def getInvoice(invoiceId: String): Option[Invoice] = currentState.get(invoiceId) | |
def getInvoices: Iterable[Invoice] = currentState.values | |
def updateInvoice[B <: Invoice](invoiceId: String)(f: Invoice => Update[InvoiceEvent, B]) = transacted { state => | |
state.get(invoiceId) match { | |
case Some(invoice) => f(invoice) | |
case None => Update.reject(DomainError("invoice %s: does not exist" format invoiceId)) | |
} | |
} | |
def updateDraftInvoice[B <: Invoice](invoiceId: String)(f: DraftInvoice => Update[InvoiceEvent, B]) = | |
updateInvoice(invoiceId) { invoice => | |
invoice match { | |
case invoice: DraftInvoice => f(invoice) | |
case invoice: Invoice => Update.reject(DomainError("invoice %s: not a draft invoice" format invoiceId)) | |
} | |
} | |
def addInvoiceItem(invoiceId: String, invoiceItem: InvoiceItem): Future[DomainValidation[DraftInvoice]] = | |
updateDraftInvoice(invoiceId) { invoice => invoice.addItem(invoiceItem) } | |
def sendInvoiceTo(invoiceId: String, to: InvoiceAddress): Future[DomainValidation[SentInvoice]] = | |
updateDraftInvoice(invoiceId) { invoice => invoice.sendTo(to) } | |
// ... | |
def project = { | |
case (state, updated) => state + (updated.id -> updated) | |
} | |
} | |
object InvoiceService { | |
def apply(log: EventLog, initial: Map[String, Invoice] = Map.empty) = new InvoiceService { | |
val eventLog = log | |
val initialState = initial | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
trait PersistentInvoice extends UpdateProjection[Invoice, Invoice] { | |
def updateDraft[B <: Invoice](f: DraftInvoice => Update[InvoiceEvent, B]) = transacted { | |
_ match { | |
case di: DraftInvoice => f(di) | |
case _ => Update.reject(DomainError("not a draft invoice")) | |
} | |
} | |
def addItem(item: InvoiceItem): Future[DomainValidation[DraftInvoice]] = | |
updateDraft(di => di.addItem(item)) | |
def sendTo(address: InvoiceAddress): Future[DomainValidation[SentInvoice]] = | |
updateDraft(di => di.sendTo(address)) | |
// ... | |
def project = { | |
case (_, invoice) => invoice | |
} | |
} | |
object PersistentInvoice { | |
def apply(log: EventLog, initial: Invoice) = new PersistentInvoice { | |
val eventLog = log | |
val initialState = initial | |
} | |
} | |
trait InvoiceService { | |
val invoices: Map[String, PersistentInvoice] | |
// ... | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment