Skip to content

Instantly share code, notes, and snippets.

@jboner
Created November 10, 2017 09:06
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save jboner/4361c710d2e2386be8bddc39edb0528a to your computer and use it in GitHub Desktop.
Save jboner/4361c710d2e2386be8bddc39edb0528a to your computer and use it in GitHub Desktop.
Demo of an Event-driven Architecture in Akka and Java. Show-casing Events-first DDD, Event Sourced Aggregates, Process Manager, etc.
package sample.eventdriven.java;
import akka.actor.*;
import akka.persistence.AbstractPersistentActor;
import scala.concurrent.duration.Duration;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
// ===============================================================
// Demo of an Event-driven Architecture in Akka and Java.
//
// Show-casing:
// 1. Events-first Domain Driven Design using Commands and Events
// 2. Asynchronous communication through an Event Stream
// 3. Asynchronous Process Manager driving the workflow
// 4. Event-sourced Aggregates
//
// Used in my talk on 'How Events Are Reshaping Modern Systems'.
//
// NOTE: This is a very much simplified and dumbed down sample
// that is by no means a template for production use.
// F.e. in a real-world app I would not use Serializable
// but a JSON, Protobuf, Avro, or some other good lib.
// I would not use Akka's built in EventStream, but
// probably Kafka or Kinesis. Etc.
// ===============================================================
// =========================================================
// Commands
// =========================================================
interface Command extends Serializable {
}
class CreateOrder implements Command {
final int userId;
final int productId;
CreateOrder(int userId, int productId) {
this.userId = userId;
this.productId = productId;
}
}
class ReserveProduct implements Command {
final int userId;
final int productId;
ReserveProduct(int userId, int productId) {
this.userId = userId;
this.productId = productId;
}
}
class SubmitPayment implements Command {
final int userId;
final int productId;
SubmitPayment(int userId, int productId) {
this.userId = userId;
this.productId = productId;
}
}
class ShipProduct implements Command {
final int userId;
final int txId;
ShipProduct(int userId, int txId) {
this.userId = userId;
this.txId = txId;
}
}
// =========================================================
// Events
// =========================================================
interface Event extends Serializable {
}
class ProductReserved implements Event {
final int userId;
final int txId;
ProductReserved(int userId, int txId) {
this.userId = userId;
this.txId = txId;
}
}
class ProductOutOfStock implements Event {
final int userId;
final int productId;
ProductOutOfStock(int userId, int productId) {
this.userId = userId;
this.productId = productId;
}
}
class PaymentAuthorized implements Event {
final int userId;
final int txId;
PaymentAuthorized(int userId, int txId) {
this.userId = userId;
this.txId = txId;
}
}
class PaymentDeclined implements Event {
final int userId;
final int txId;
PaymentDeclined(int userId, int txId) {
this.userId = userId;
this.txId = txId;
}
}
class ProductShipped implements Event {
final int userId;
final int txId;
ProductShipped(int userId, int txId) {
this.userId = userId;
this.txId = txId;
}
}
class OrderCompleted implements Event {
final int userId;
final int txId;
OrderCompleted(int userId, int txId) {
this.userId = userId;
this.txId = txId;
}
}
// =========================================================
// Top-level service functioning as a Process Manager
// Coordinating the workflow on behalf of the Client
// =========================================================
class Orders extends AbstractActor {
final ActorRef client;
final ActorRef inventory;
final ActorRef payment;
public Orders(ActorRef client, ActorRef inventory, ActorRef payment) {
this.client = client;
this.inventory = inventory;
this.payment = payment;
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(CreateOrder.class, cmd -> {
System.out.println("COMMAND:\t\t" + cmd + " => " + getSelf().path().name());
inventory.tell(new ReserveProduct(cmd.userId, cmd.productId), getSelf());
})
.match(ProductReserved.class, evt -> {
System.out.println("EVENT:\t\t\t" + evt + " => " + getSelf().path().name());
payment.tell(new SubmitPayment(evt.userId, evt.txId), getSelf());
})
.match(PaymentAuthorized.class, evt -> {
System.out.println("EVENT:\t\t\t" + evt + " => " + getSelf().path().name());
inventory.tell(new ShipProduct(evt.userId, evt.txId), getSelf());
})
.match(ProductShipped.class, evt -> {
System.out.println("EVENT:\t\t\t" + evt + " => " + getSelf().path().name());
client.tell(new OrderCompleted(evt.userId, evt.txId), getSelf());
})
.build();
}
@Override
public void preStart() {
// Subscribe to Events from the Event Stream
getContext().system().eventStream().subscribe(getSelf(), ProductReserved.class);
getContext().system().eventStream().subscribe(getSelf(), ProductOutOfStock.class);
getContext().system().eventStream().subscribe(getSelf(), ProductShipped.class);
getContext().system().eventStream().subscribe(getSelf(), PaymentAuthorized.class);
getContext().system().eventStream().subscribe(getSelf(), PaymentDeclined.class);
}
}
// =========================================================
// Event Sourced Aggregate
// =========================================================
class Inventory extends AbstractPersistentActor {
@Override
public String persistenceId() {
return "inventory";
}
int nrOfProductsShipped = 0; // Mutable state, persisted in memory (AKA Memory Image)
Event reserveProduct(int userId, int productId) {
System.out.println("SIDE-EFFECT:\tReserving Product => " + getSelf().path().name());
return new ProductReserved(userId, productId);
}
Event shipProduct(int userId, int txId) {
nrOfProductsShipped += 1; // Update internal state
System.out.println("SIDE-EFFECT:\tShipping Product => " + getSelf().path().name() +
" - ProductsShipped: " + nrOfProductsShipped);
return new ProductShipped(userId, txId);
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(ReserveProduct.class, cmd -> { // Receive ReserveProduct Command
System.out.println("COMMAND:\t\t" + cmd + " => " + getSelf().path().name());
Event productStatus = reserveProduct(cmd.userId, cmd.productId); // Try to reserve the product
persist(productStatus, evt -> { // Try to persist the Event
getContext().system().eventStream().publish(evt); // Publish Event to Event Stream
});
})
.match(ShipProduct.class, cmd -> { // Receive ShipProduct Command
System.out.println("COMMAND:\t\t" + cmd + " => " + getSelf().path().name());
Event shippingStatus = shipProduct(cmd.userId, cmd.txId); // Try to ship the product
persist(shippingStatus, evt -> { // Try to persist the Event
getContext().system().eventStream().publish(evt); // Publish Event to Event Stream
});
})
.build();
}
@Override
public Receive createReceiveRecover() {
return receiveBuilder()
.match(ProductReserved.class, evt -> { // Replay ProductReserved
System.out.println("EVENT (REPLAY):\t" + evt + " => " + getSelf().path().name());
})
.match(ProductShipped.class, evt -> { // Replay ProductShipped
nrOfProductsShipped += 1; // Update the internal state
System.out.println("EVENT (REPLAY):\t" + evt + " => " + getSelf().path().name() +
" - ProductsShipped: " + nrOfProductsShipped);
})
.build();
}
}
// =========================================================
// Event Sourced Aggregate
// =========================================================
class Payment extends AbstractPersistentActor {
@Override
public String persistenceId() {
return "payment";
}
int uniqueTransactionNr = 0; // Mutable state, persisted in memory (AKA Memory Image)
Event processPayment(int userId, int txId) {
uniqueTransactionNr += 1; // Update the internal state
System.out.println("SIDE-EFFECT:\tProcessing payment => " + getSelf().path().name() +
" - TxNumber: " + uniqueTransactionNr);
return new PaymentAuthorized(userId, uniqueTransactionNr);
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(SubmitPayment.class, cmd -> { // Receive SubmitPayment Command
System.out.println("COMMAND:\t\t" + cmd + " => " + getSelf().path().name());
Event paymentStatus = processPayment(cmd.userId, cmd.productId); // Try to pay product
persist(paymentStatus, evt -> { // Try to persist the Event
getContext().system().eventStream().publish(evt); // Publish Event to Event Stream
});
})
.build();
}
@Override
public Receive createReceiveRecover() {
return receiveBuilder()
.match(PaymentAuthorized.class, evt -> { // Replay PaymentAuthorized
uniqueTransactionNr += 1; // Update the internal state
System.out.println("EVENT (REPLAY):\t" + evt + " => " + getSelf().path().name() +
" - TxNumber: " + uniqueTransactionNr);
})
.build();
}
}
// =========================================================
// Running the Order Management simulation
// =========================================================
public class OrderManagement {
public static void main(String... args) throws Exception {
// Create the Order Management actor system
final ActorSystem system = ActorSystem.create("OrderManagement");
// Plumbing for "client"
final Inbox clientInbox = Inbox.create(system);
final ActorRef client = clientInbox.getRef();
// Create the services
final ActorRef inventory = system.actorOf(Props.create(Inventory.class), "Inventory");
final ActorRef payment = system.actorOf(Props.create(Payment.class), "Payment");
final ActorRef orders = system.actorOf(Props.create(Orders.class, client, inventory, payment), "Orders");
// Send a CreateOrder Command to the Orders service
clientInbox.send(orders, new CreateOrder(9, 1337));
try {
// Wait for the order confirmation
Object confirmation = clientInbox.receive(Duration.create(5, TimeUnit.SECONDS));
System.out.println("EVENT:\t\t\t" + confirmation + " => Client");
} catch (TimeoutException e) {
System.out.println("Waited 5 seconds for the OrderCompleted event, giving up...");
}
System.out.println("Order completed. Shutting down system.");
system.terminate();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment