Skip to content

Instantly share code, notes, and snippets.

@allain
Last active July 25, 2020 23:34
Show Gist options
  • Save allain/fd29be358a8749a387945cd3962185da to your computer and use it in GitHub Desktop.
Save allain/fd29be358a8749a387945cd3962185da to your computer and use it in GitHub Desktop.
Dreamcoding an Event Sourced DDD JavaScript Library
import { App, Event, Problem, Reducer, Validator } from '.'
const app = App()
// an reducer middleware that places the reduction on the context
const statsReducer = Reducer(
'stats',
({count, total, min, max}, event) => {
switch (event.name) {
case 'recorded':
return {
count: count + 1,
total: total + event.amount,
min: Math.min(event.amount, min),
max: Math.max(event.amount, max)
}
}
},
{ count: 0, total: 0, min: Number.
)
// middleware that can check the payload. used for both deposit and withdraw requests
const transactionValidator = Validator({
$schema: 'https://json-schema.org/draft/2019-09/schema',
type: 'object',
properties: {
accountId: {
type: 'string'
},
amount: {
type: 'number'
}
},
additionalProperties: false
})
// app level commands that delegate to other aggregates
app.command(
'deposit',
transactionValidator,
async ({ payload: { accountId, amount } }) =>
// delegate to an account aggregate
this.request(`/account/${accountId}`, 'deposit', { amount })
)
app.command(
'withdraw',
transactionValidator,
({ payload: { accountId, amount } }) =>
this.request(`/account/${accountId}`, 'withdraw', { amount })
)
// middleware that can check the payload. used for both deposit and withdraw requests
const accountTransactionValidator = Validator({
$schema: 'https://json-schema.org/draft/2019-09/schema',
type: 'object',
properties: {
amount: {
type: 'number'
}
},
additionalProperties: false
})
// define an aggregate for each account with commands deposit and withdraw
app
.aggregate('/account/:id')
.command(
'deposit',
accountTransactionValidator,
({ payload }) => Event('deposited', { amount: payload.amount })
)
.command(
'withdraw',
accountTransactionValidator,
balanceReducer,
({ payload: { amount }, context: { balance } }) => {
if (balance < amount) return new Problem('low-balance')
return new Event('withdrawn', { amount })
}
)
// send a request to the app directly, which will be delegated to other aggregates
// all events created during the request are considered part of a transaction,
// when the request returns successfully, all events created while processing are then committed.
await app.request('deposit', { accountId: '123', amount: 20 })
// send a request directly to an aggregate
await app.request('/account/123', 'deposit', { amount: 50 })
await app.request('/account/123', 'withdraw', { amount: 100 })
import { App, Event, Problem, Reducer, Validator } from '.'
const app = App()
const RECORD_SIGNAL_SCHEMA = {
$schema: 'https://json-schema.org/draft/2019-09/schema',
type: 'object',
properties: {
payload: {
type: 'object',
properties: {
signal: {
type: 'string'
},
value: {
type: 'number'
}
},
additionalProperties: false
}
}
}
// app level command that delegates to an internal aggregate
app.command(
'recordSignal',
// middleware that ensures the recordSignal request conforms to expected schema
Validator(RECORD_SIGNAL_SCHEMA), // Returns a Problem if it doesn't conform
function ({ payload: { signal, value } }) {
// delegate to a signal aggregate
return this.request(`/signal/${signal}`, 'record', { value })
}
)
// define an aggregate for each signal. It uses middleware to compute as signal summary
// since things are internal here, validator middleware does not need to be added
app.aggregate('/signal/:name').command(
'record',
Reducer(
'count', // this is the prop name that will be exposed in the context
(count, event) => {
switch (event.name) {
case 'recorded':
return count + 1
}
},
0
),
({ payload: { value }, context: { count } }) => {
// enforce a rule that says we can't have more than 1000 signals recorded
if (count >= 1000) return new Problem('limit-reached')
return new Event('recorded', { value })
}
)
// requests are send directly to the app, never to the aggregates directly
await app.request('recordSignal', { signal: 'temperature', value: 26 })
// Uses command and answer to define commands and answerables.
// Not using query since it reads as if you're asking the question rather than defining an answerable
// uses act, ask, and watch to interact with the system once it's setup
// commands and answers are almost the same thing except that answers must return a value, and cannot return Events
// Additionally, an answer can be watched, but a command cannot be.
import { App, Event, Problem, Reducer, Validator } from '.'
const app = App()
const COMMAND_RECORD_SIGNAL_SCHEMA = {
$schema: 'https://json-schema.org/draft/2019-09/schema',
type: 'object',
properties: {
payload: {
type: 'object',
properties: {
signal: {
type: 'string'
},
value: {
type: 'number'
}
},
additionalProperties: false
}
}
}
const QUESTION_TOTAL_SIGNAL_SCHEMA = {
$schema: 'https://json-schema.org/draft/2019-09/schema',
type: 'object',
properties: {
payload: {
type: 'object',
properties: {
signal: {
type: 'string'
}
},
additionalProperties: false
}
}
}
// app level command that delegates to an internal aggregate
app.command(
'recordSignal',
// middleware that ensures the recordSignal request conforms to expected schema
Validator(COMMAND_RECORD_SIGNAL_SCHEMA), // Returns a Problem if it doesn't conform
function ({ payload: { signal, value } }) {
// delegate to a signal aggregate
return this.request(`/signal/${signal}`, 'record', { value })
}
)
// register an answer that can be queried or watched (see below)
app.answer(
'countSignal',
Validator(QUESTION_TOTAL_SIGNAL_SCHEMA),
({ payload: { signal } }) => this.query(`/signal/${signal}`, 'total')
)
// define an aggregate for each signal. It uses middleware to compute as signal summary
// since things are internal here, validator middleware does not need to be added
app
.aggregate(
'/signal/:name',
Reducer(
'count', // this is the prop name that will be exposed in the context
(count, event) => {
switch (event.name) {
case 'recorded':
return count + 1
}
},
0
)
)
.command('record', ({ payload: { value }, context: { count } }) => {
// enforce a rule that says we can't have more than 1000 signals recorded
if (count >= 1000) return new Problem('limit-reached')
return new Event('recorded', { value })
})
.answer('count', ({ context: { count } }) => count)
// requests are sent directly to the app, never to the aggregates directly
// request a command, they will always
await app.act('recordSignal', { signal: 'temperature', value: 26 })
await app.act('recordSignal', { signal: 'temperature', value: 22 })
await app.ask('countSignal', { signal: 'temperature' }) // resolves to 2
// Watching an answer
const observable = app.watch('countSignal', { signal: 'temperature' })
// logs 2 to the console
observable.observe(console.log)
// logs 3 to the console
await app.act('recordSignal', { signal: 'temperature', value: 24 })
import { Aggregate, Event, Problem, Reducer, Validator, State } from './okur'
const root = Aggregate()
const COMMAND_RECORD_SIGNAL_SCHEMA = {
$schema: 'https://json-schema.org/draft/2019-09/schema',
type: 'object',
properties: {
payload: {
type: 'object',
properties: {
signal: {
type: 'string'
},
value: {
type: 'number'
}
},
additionalProperties: false
}
}
}
const QUESTION_TOTAL_SIGNAL_SCHEMA = {
$schema: 'https://json-schema.org/draft/2019-09/schema',
type: 'object',
properties: {
payload: {
type: 'object',
properties: {
signal: {
type: 'string'
}
},
additionalProperties: false
}
}
}
// app level command that delegates to an internal aggregate
root.command(
'recordSignal',
// middleware that ensures the recordSignal request conforms to expected schema
Validator(COMMAND_RECORD_SIGNAL_SCHEMA), // Returns a Problem if it doesn't conform
function ({ payload: { signal, value } }) {
// delegate to a signal aggregate
return this.aggregate(`signal/${signal}`).request('record', { value })
}
)
// register an answer that can be asked and/or watched (see below)
root.answer(
'countSignal',
Validator(QUESTION_TOTAL_SIGNAL_SCHEMA),
({ payload: { signal } }) => this.aggregate(`signal/${signal}`).query('total')
)
// define an aggregate for each signal. It uses middleware to compute as signal summary
// since things are internal here, validator middleware does not need to be added
root
.aggregate(
'signal/:name',
// middleware defined here is applied to all commands and
Reducer(
'count', // this is the prop name that will be exposed in the context
(count, event) => {
switch (event.name) {
case 'recorded':
return count.update(val => val + 1)
}
},
new State(0)
)
)
.command('record', ({ payload: { value }, context: { count } }) => {
// enforce a rule that says we can't have more than 1000 signals recorded
if (count >= 1000) return new Problem('limit-reached')
return new Event('recorded', { value })
})
.answer('count', ({ context: { count } }) => count)
// requests are sent directly to the app, never to the aggregates directly
// request a command, they will always
await root.act('recordSignal', { signal: 'temperature', value: 26 })
await root.act('recordSignal', { signal: 'temperature', value: 22 })
await root.ask('countSignal', { signal: 'temperature' }) // resolves to 2
// Watching an answer
const observable = root.watch('countSignal', { signal: 'temperature' })
// logs 2 to the console
observable.observe(console.log)
// logs 3 to the console
await root.act('recordSignal', { signal: 'temperature', value: 24 })
import { App, Problem } from './okur'
const root = new App()
root.use(usersContext)
function usersContext (root) {
// augment the root context with registerUser command
root.command(
'registerUser',
//Validator({ full JSON Schema here }),
async function ({ payload: { email, password, passwordConfirmation } }) {
if (password !== passwordConfirmation) {
return new Problem('password-confirmation-mismatch')
}
await this.aggregate(`users/${userId}`).event('created', {
email,
password
})
}
)
root.command(
'updateUser',
// Validator ({full JSON schema here })
function ({ payload: { userId, ...rest } }) {
return this.aggregate(`users:${userId}`).request('update', rest)
}
)
root.command('deleteUser', function ({ payload: { userId } }) {
return this.aggregate(`users:${userId}`).request('delete')
})
const userEmailsReducer = new Reducer(
'userEmails',
(emailMap, { type, aggregateId, payload: { email } }) => {
switch (event.type) {
case 'created':
emailMap[aggregateId] = email
break
case 'updated':
if (email) {
emailMap[aggregateId] = email
}
break
case 'deleted':
delete emailMap[aggregateId]
break
}
return emailMap
},
{},
'/users/:userId'
)
root
.factory(
'users/:userId',
Reducer(
'user',
(user, { type, payload }) => {
switch (type) {
case 'created':
return payload
case 'updated':
for (const [prop, val] of Object.entries(payload)) {
user[prop] = val
}
return user
default:
return user
}
},
{}
)
)
.command('update', userEmailsReducer, async function (
{ payload: { email, password, passwordConfirm, ...update } },
{ userEmails, user }
) {
if (email) {
const userIdWithEmail = userEmails[email]
if (userIdWithEmail && userIdWithEmail !== user.id) {
return this.problem('email.exists')
}
}
if (password && password !== passwordConfirm) {
return this.problem('passwords.unmatched')
}
return this.event('updated', update)
})
// note: userId comes from the route patterns
.command('delete', userEmailsReducer, async function ({
params: { userId }
}) {
if (userEmails[userId]) {
return this.event('deleted')
}
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment