Skip to content

Instantly share code, notes, and snippets.

@jboner
Created November 10, 2017 09:08
Show Gist options
  • Star 12 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save jboner/dea4aa819e127d543d684208d74081b5 to your computer and use it in GitHub Desktop.
Save jboner/dea4aa819e127d543d684208d74081b5 to your computer and use it in GitHub Desktop.
Demo of an Event-driven Architecture in Akka and Scala. Show-casing Events-first DDD, Event Sourced Aggregates, Process Manager, etc.
package sample.eventdriven.scala
import akka.actor.{Actor, ActorRef, ActorSystem, Inbox, Props}
import akka.persistence.PersistentActor
import scala.concurrent.ExecutionContext.Implicits._
import scala.concurrent.duration._
// ===============================================================
// Demo of an Event-driven Architecture in Akka and Scala.
//
// Show-casing:
// 1. Events-first Domain Driven Design using Commands and Events
// 2. Asynchronous communication through an Event Stream
// 3. Asynchronous Process Manager driving the workflow
// 4. Event-sourced Aggregates
//
// Used in my talk on 'How Events Are Reshaping Modern Systems'.
//
// NOTE: This is a very much simplified and dumbed down sample
// that is by no means a template for production use.
// F.e. in a real-world app I would not use Serializable
// but a JSON, Protobuf, Avro, or some other good lib.
// I would not use Akka's built in EventStream, but
// probably Kafka or Kinesis. Etc.
// ===============================================================
object OrderManagement extends App {
// =========================================================
// Commands
// =========================================================
sealed trait Command
case class CreateOrder(userId: Int, productId: Int) extends Command
case class ReserveProduct(userId: Int, productId: Int) extends Command
case class SubmitPayment(userId: Int, productId: Int) extends Command
case class ShipProduct(userId: Int, txId: Int) extends Command
// =========================================================
// Events
// =========================================================
sealed trait Event
case class ProductReserved(userId: Int, txId: Int) extends Event
case class ProductOutOfStock(userId: Int, productId: Int) extends Event
case class PaymentAuthorized(userId: Int, txId: Int) extends Event
case class PaymentDeclined(userId: Int, txId: Int) extends Event
case class ProductShipped(userId: Int, txId: Int) extends Event
case class OrderCompleted(userId: Int, txId: Int) extends Event
// =========================================================
// Top-level service functioning as a Process Manager
// Coordinating the workflow on behalf of the Client
// =========================================================
class Orders(client: ActorRef, inventory: ActorRef, payment: ActorRef) extends Actor {
override def preStart = {
system.eventStream.subscribe(self, classOf[ProductReserved]) // Subscribe to ProductReserved Events
system.eventStream.subscribe(self, classOf[ProductOutOfStock]) // Subscribe to ProductOutOfStock Events
system.eventStream.subscribe(self, classOf[ProductShipped]) // Subscribe to ProductShipped Events
system.eventStream.subscribe(self, classOf[PaymentAuthorized]) // Subscribe to PaymentAuthorized Events
system.eventStream.subscribe(self, classOf[PaymentDeclined]) // Subscribe to PaymentDeclined Events
}
def receive = {
case cmd: CreateOrder => // 1. Receive CreateOrder Command
inventory.tell(ReserveProduct(cmd.userId, cmd.productId), self) // 2. Send ReserveProduct Command to Inventory
println(s"COMMAND:\t\t$cmd => ${self.path.name}")
case evt: ProductReserved => // 3. Receive ProductReserved Event
payment.tell(SubmitPayment(evt.userId, evt.txId), self) // 4. Send SubmitPayment Command to Payment
println(s"EVENT:\t\t\t$evt => ${self.path.name}")
case evt: PaymentAuthorized => // 5. Receive PaymentAuthorized Event
inventory.tell(ShipProduct(evt.userId, evt.txId), self) // 6. Send ShipProduct Command to Inventory
println(s"EVENT:\t\t\t$evt => ${self.path.name}")
case evt: ProductShipped => // 7. Receive ProductShipped Event
client.tell(OrderCompleted(evt.userId, evt.txId), self) // 8. Send OrderCompleted Event back to Client
println(s"EVENT:\t\t\t$evt => ${self.path.name}")
}
}
// =========================================================
// Event Sourced Aggregate
// =========================================================
class Inventory extends PersistentActor {
val persistenceId = "Inventory"
var nrOfProductsShipped = 0 // Mutable state, persisted in memory (AKA Memory Image)
def reserveProduct(userId: Int, productId: Int): Event = {
println(s"SIDE-EFFECT:\tReserving Product => ${self.path.name}")
ProductReserved(userId, productId)
}
def shipProduct(userId: Int, txId: Int): Event = {
nrOfProductsShipped += 1 // Update internal state
println(s"SIDE-EFFECT:\tShipping Product => ${self.path.name}" + " - ProductsShipped: " + nrOfProductsShipped)
ProductShipped(userId, txId)
}
def receiveCommand = {
case cmd: ReserveProduct => // Receive ReserveProduct Command
val productStatus = reserveProduct(cmd.userId, cmd.productId) // Try to reserve the product
persist(productStatus) { event => // Try to persist the Event
context.system.eventStream.publish(event) // If successful, publish Event to Event Stream
}
println(s"COMMAND:\t\t$cmd => ${self.path.name}")
case cmd: ShipProduct => // Receive ShipProduct Command
val shippingStatus = shipProduct(cmd.userId, cmd.txId) // Try to ship the product
persist(shippingStatus) { event => // Try to persist the Event
context.system.eventStream.publish(event) // If successful, publish Event to Event Stream
}
println(s"COMMAND:\t\t$cmd => ${self.path.name}")
}
def receiveRecover = {
case event: ProductReserved => // Replay the ProductReserved events
println(s"EVENT (REPLAY):\t$event => ${self.path.name}")
case event: ProductShipped => // Replay the ProductShipped events
nrOfProductsShipped += 1
println(s"EVENT (REPLAY):\t$event => ${self.path.name} - ProductsShipped: $nrOfProductsShipped")
}
}
// =========================================================
// Event Sourced Aggregate
// =========================================================
class Payment extends PersistentActor {
val persistenceId = "Payment"
var uniqueTransactionNr = 0 // Mutable state, persisted in memory (AKA Memory Image)
def processPayment(userId: Int, txId: Int): Event = {
uniqueTransactionNr += 1
println(s"SIDE-EFFECT:\tProcessing Payment => ${self.path.name} - TxNumber: $uniqueTransactionNr")
PaymentAuthorized(userId, uniqueTransactionNr)
}
def receiveCommand = {
case cmd: SubmitPayment => // Receive SubmitPayment Command
val paymentStatus = processPayment(cmd.userId, cmd.productId) // Try to pay product
persist(paymentStatus) { event => // Try to persist Event
context.system.eventStream.publish(event) // If successful, publish Event to Event Stream
}
println(s"COMMAND:\t\t$cmd => ${self.path.name}")
}
def receiveRecover = {
case evt: PaymentAuthorized => // Replay the PaymentAuthorized events
uniqueTransactionNr += 1
println(s"EVENT (REPLAY):\t$evt => ${self.path.name} - TxNumber: $uniqueTransactionNr")
case evt: PaymentDeclined => // Replay the PaymentDeclined events
println(s"EVENT (REPLAY):\t$evt => ${self.path.name}")
}
}
// =========================================================
// Running the Order Management simulation
// =========================================================
val system = ActorSystem("OrderManagement")
// Plumbing for "client"
val clientInbox = Inbox.create(system)
val client = clientInbox.getRef()
// Create the services (cheating with "DI" by exploiting enclosing object scope)
val inventory = system.actorOf(Props(classOf[Inventory]), "Inventory")
val payment = system.actorOf(Props(classOf[Payment]), "Payment")
val orders = system.actorOf(Props(classOf[Orders], client, inventory, payment), "Orders")
// Submit an order
clientInbox.send(orders, CreateOrder(9, 1337)) // Send a CreateOrder Command to the Orders service
clientInbox.receive(5.seconds) match { // Wait for OrderCompleted Event
case confirmation: OrderCompleted =>
println(s"EVENT:\t\t\t$confirmation => Client")
}
system.terminate().foreach(_ => println("System has terminated"))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment