Last active
January 30, 2024 23:41
-
-
Save sam-n-johnston/0f5aeb1ba6ff1e29707f5a39af0bb813 to your computer and use it in GitHub Desktop.
Implementing an Event-Sourced System
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Order { | |
public id: string; | |
// Used to keep track of new events for this aggregate | |
private _events: Event[] = []; | |
private version: number = -1; | |
private shippingAddress: Address; | |
private billingAddress: Address; | |
private items: Item[]; | |
private shippingMethod: string; | |
public get events(): Event[] { | |
return this._events | |
} | |
public place( | |
shippingAddress: Address, | |
billingAddress: Address, | |
items: Item[], | |
shippingMethod: string, | |
) { | |
// perform validation, checks & business rules. Don't change any properties here | |
// then record that the event happened | |
this.recordEvent( | |
new OrderPlaced( | |
this.id, | |
// Event data | |
{ | |
shippingAddress, | |
billingAddress, | |
items, | |
shippingMethod, | |
} | |
) | |
) | |
} | |
public capturePayment() { | |
// Similar to the "place" method | |
} | |
public shipPackage() { | |
// Similar to the "place" method | |
} | |
private recordEvent(event: Event) { | |
this.when(event); | |
this._events.push(event); | |
} | |
public when(event: Event) { | |
switch (event.constructor) { | |
case OrderPlaced: | |
this.whenOrderPlaced(event); | |
case PaymentCaptured: | |
// ... | |
case PackageShipped: | |
// ... | |
default: | |
throw new Error(`Unprocessable event: ${event.toJSON()}`) | |
} | |
} | |
protected whenOrderPlaced(event: OrderPlaced) { | |
/** | |
* Here we update the properties of this instance without | |
* checking business rules. | |
* This will be used both during processing commands | |
* and when the repository is rebuilding the current aggregate | |
*/ | |
this.shippingAddress = event.data.shippingAddress; | |
this.billingAddress = event.data.billingAddress; | |
this.items = event.data.items; | |
this.shippingMethod = event.data.shippingMethod; | |
} | |
public clearEvents() { | |
this.version += this._events.length; | |
this._events = []; | |
} | |
public incrementVersion() { | |
this.version++; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
async function projector(event: OrderPlaced | PaymentCaptured | PackageShipped): Promise<void> => { | |
switch (event.constructor) { | |
case OrderPlaced: | |
case PaymentCaptured: | |
case PackageShipped: | |
const order = await this.orderRepository.find(event.aggregateId, event.sequenceNumber); | |
await this.orderProjectionRepository.save(order); | |
default: | |
throw new Error(`Unprocessable event: ${event.toJSON()}`) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class DynamodbOrderRepository implements OrderRepository { | |
public async save(order: Order): Promise<void> { | |
// Save events, ideally in a single transaction. Don't forget to increment the events' seqence numbers here! | |
await myDataBaseEngine.insert(buildInsertCommand(order.events)); | |
// This removes the pending events from the aggregate and updates the aggregate version | |
order.clearEvents(); | |
} | |
public async find (orderId: string): Promise<Order> { | |
const rawEvents: DatabaseEvent[] = await myDataBaseEngine.query(someQuery); | |
// rebuild event class instances | |
const events: Event[] = rawEvents.map(rawEvent => mapToEventClass(rawEvent)) | |
const newOrder = new Order(); | |
// Rebuild the aggregate's current state | |
events.forEach(event => { | |
newOrder.when(event); | |
newOrder.incrementVersion(); | |
}) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment