Skip to content

Instantly share code, notes, and snippets.

@sam-n-johnston
Last active January 30, 2024 23:41
Show Gist options
  • Save sam-n-johnston/0f5aeb1ba6ff1e29707f5a39af0bb813 to your computer and use it in GitHub Desktop.
Save sam-n-johnston/0f5aeb1ba6ff1e29707f5a39af0bb813 to your computer and use it in GitHub Desktop.
Implementing an Event-Sourced System
class Order {
public id: string;
// Used to keep track of new events for this aggregate
private _events: Event[] = [];
private version: number = -1;
private shippingAddress: Address;
private billingAddress: Address;
private items: Item[];
private shippingMethod: string;
public get events(): Event[] {
return this._events
}
public place(
shippingAddress: Address,
billingAddress: Address,
items: Item[],
shippingMethod: string,
) {
// perform validation, checks & business rules. Don't change any properties here
// then record that the event happened
this.recordEvent(
new OrderPlaced(
this.id,
// Event data
{
shippingAddress,
billingAddress,
items,
shippingMethod,
}
)
)
}
public capturePayment() {
// Similar to the "place" method
}
public shipPackage() {
// Similar to the "place" method
}
private recordEvent(event: Event) {
this.when(event);
this._events.push(event);
}
public when(event: Event) {
switch (event.constructor) {
case OrderPlaced:
this.whenOrderPlaced(event);
case PaymentCaptured:
// ...
case PackageShipped:
// ...
default:
throw new Error(`Unprocessable event: ${event.toJSON()}`)
}
}
protected whenOrderPlaced(event: OrderPlaced) {
/**
* Here we update the properties of this instance without
* checking business rules.
* This will be used both during processing commands
* and when the repository is rebuilding the current aggregate
*/
this.shippingAddress = event.data.shippingAddress;
this.billingAddress = event.data.billingAddress;
this.items = event.data.items;
this.shippingMethod = event.data.shippingMethod;
}
public clearEvents() {
this.version += this._events.length;
this._events = [];
}
public incrementVersion() {
this.version++;
}
}
async function projector(event: OrderPlaced | PaymentCaptured | PackageShipped): Promise<void> => {
switch (event.constructor) {
case OrderPlaced:
case PaymentCaptured:
case PackageShipped:
const order = await this.orderRepository.find(event.aggregateId, event.sequenceNumber);
await this.orderProjectionRepository.save(order);
default:
throw new Error(`Unprocessable event: ${event.toJSON()}`)
}
}
class DynamodbOrderRepository implements OrderRepository {
public async save(order: Order): Promise<void> {
// Save events, ideally in a single transaction. Don't forget to increment the events' seqence numbers here!
await myDataBaseEngine.insert(buildInsertCommand(order.events));
// This removes the pending events from the aggregate and updates the aggregate version
order.clearEvents();
}
public async find (orderId: string): Promise<Order> {
const rawEvents: DatabaseEvent[] = await myDataBaseEngine.query(someQuery);
// rebuild event class instances
const events: Event[] = rawEvents.map(rawEvent => mapToEventClass(rawEvent))
const newOrder = new Order();
// Rebuild the aggregate's current state
events.forEach(event => {
newOrder.when(event);
newOrder.incrementVersion();
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment