Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Deciders implementation
/// +--------------------------------------+
/// | |
/// | Decider |
/// | |
/// | Jérémie Chassaing |
/// | @thinkb4coding |
/// +--------------------------------------+
// A decider is a structure define by 7 parameters:
// 1. a Command type
// 2. an Event type
// 3. a State type
// 4. an initialState value of type State
// 5. a decide function: Command -> State -> Event list
// 6. an evolve function: State -> Event -> State
// 7. a isTerminal function: State -> bool
// The Command type represents the actions that can be
// done on the decider. Cases are verbs in the imperative.
type Command =
| Build of maxUses: int
| SwitchOn
| SwitchOff
// The Event type represents the effects of actions
// done on the decider. Cases are verbs in the past tense.
type Event =
| Built of maxUses: int
| SwitchedOn
| SwitchedOff
| Broke
// State contains information necessary to take the next decision
type State =
| NotBuilt // state before anything happens
| Working of BulbState * remainingUses: int
| Broken
and BulbState = On | Off
// initialState is the value of the state before anything happened
let initialState = NotBuilt
// the decide function contains the actual business rules.
// read it like: When you ask to do this Command, while in this State,
// here is what happens (the Events)
// it can return an empty list when nothing happens
let decide (cmd: Command) (state: State) =
match state, cmd with
| NotBuilt, Build uses -> [ Built uses]
| Working(Off,0), SwitchOn -> [ Broke ]
| Working(Off,_), SwitchOn -> [ SwitchedOn ]
| Working(On, _), SwitchOff -> [ SwitchedOff ]
| _ -> []
// the evolve function computes the next State based on pervious State and given Event.
// this state will be used for next call to decide
let evolve (state: State) (event: Event) =
match state, event with
| NotBuilt, Built uses -> Working(Off, uses)
| Working(Off, uses), SwitchedOn -> Working(On, uses-1 )
| Working(On, uses), SwitchedOff -> Working(Off, uses)
| Working _, Broke -> Broken
| _ -> state
// once isTerminal return true, nothing will happen anymore to the decider.
// it can then be archived or deleted.
let isTerminal (state: State) = state = Broken
// notice at this point that there is no notion of identity.
// identity is managed at the application layer
module Tests =
// we can easily write BDD tests for a decider
// for this we will define => and =! operators
// test will be of the form
//
// [ Past events]
// => Command
// =! [ Expeced events ]
//
// You can read it like this:
// Given <Past events>
// When <Command>
// Expect <Expected events>
// The => operator takes past events on the left and a command on the right.
// of course in other languages you can use a classic 2 argument functions.
// here the infix operator make it more visual
let (=>) events cmd =
// compute current state, starting from initial state
// and using evolve for each event
let state = List.fold evolve initialState events
// now we have a command (from argument), and a state, we can call decide
// the result is the actual list of events
decide cmd state
// the =! operator is read equal_bang, or assert.
// it simply checks that le value on the left is equal to the value
// on the right and prints the result. When using a testing framework
// you can use its Assert.Equal function.
let (=!) actual expected =
if actual = expected then
printfn "✅"
else
printfn "❌"
printfn "actual: %A" actual
printfn "expected: %A" expected
// building a bulb in initial state should work
[ ]
=> Build 5
=! [ Built 5 ]
// building a bulb that has already been built should do nothing
[ Built 3 ]
=> Build 5
=! []
// after being built, the bulb is off. Switching it on should do it
[ Built 5]
=> SwitchOn
=! [SwitchedOn]
// after being built, the bulb is off. Switching it off does nothing
[ Built 5]
=> SwitchOff
=! []
// Once switched on, switching on should do nothing
[ Built 5
SwitchedOn]
=> SwitchOn
=! []
// Once switched on, switching off should do it
[ Built 5
SwitchedOn ]
=> SwitchOff
=! [ SwitchedOff ]
// Once already switched on the maximum use times (here 2),
// switching on again should break it
[ Built 2
SwitchedOn
SwitchedOff
SwitchedOn
SwitchedOff ]
=> SwitchOn
=! [ Broke ]
// Once it broke, switching on does nothing
[ Built 1
SwitchedOn
SwitchedOff
Broke ]
=> SwitchOn
=! []
// Once it broke, switching off does nothing
[ Built 1
SwitchedOn
SwitchedOff
Broke ]
=> SwitchOff
=! []
// The following module demonstrate how to use a decider in
// different settings:
// * without persistence
// * with state persistence
// * with events persistence
// they are presented in order of complexity,
// use the simplest that work for your case.
module ``Global mutable state`` =
// The simplest way to run a decider is using a global mutable state
// when writing a new system, it can ensure you that the logic is correct
// before implementing persistence.
// You can use this technic in test or dev environment to validate the design
let mutable state = initialState
// the command hander, when you call it with a command,
// it will decide what happens based on current state,
// then compute and save new state in the global variable
// and return events.
let handle cmd =
let events = decide cmd state
state <- List.fold evolve state events
events
// sample commands to test it (execute them in interactive mode)
handle (Build 5)
handle SwitchOn
handle SwitchOff
// of course, in a complete application, the web api will
// call the handle function with the received command, get the result
// events, and apply side effects accordingly.
// here, the identity is the global mutable variable!
module ``State in memory`` =
// The problem with the global state version is that you can only
// have one instance.
// to fix this, we put the state inside a closure
// when calling bulb, we get a function that captures the mutable state
// and does the same as above with this state instance
let bulb() =
let mutable state = initialState
fun cmd ->
let events = decide cmd state
state <- List.fold evolve state events
events
// sample commands to test it
let b1 = bulb()
b1 (Build 5)
b1 SwitchOn
b1 SwitchOff
// here, the identity is the function instance
// similar version, but in object form:
type Bulb() =
let mutable state = initialState // a mutable instance field
// a method that handle the command
member this.Handle cmd =
let events = decide cmd state
state <- List.fold evolve state events
events
// sample commands to test it
let b2 = Bulb()
b2.Handle(Build 5)
b2.Handle(SwitchOn)
b2.Handle(SwitchOff)
// in this case, the identity is the object reference
// Now, to persist the state, we need to serialize and deserialize it.
// of course, you can use a json serializer to do it. But dont forget
// to convert your domain types to DTOs before serialization, you don't
// want to bloat your domain code with Json attributes/annotiations,
// and you want an adaptation layer to be able to refactor your state
// without breaking everything...
// And dont forget the power of simplicity!
module State =
let serialize (state: State) =
match state with
| NotBuilt -> "NotBuilt"
| Working(On, uses) -> $"On {uses}"
| Working(Off, uses) -> $"Off {uses}"
| Broken -> "Broken"
let deserialize (s: string) =
match s.Split(' ') with
| [| "NotBuilt" |] -> NotBuilt
| [| "On"; uses|] -> Working(On, int uses)
| [| "Off"; uses|] -> Working(Off, int uses)
| [| "Broken" |] -> Broken
| _ -> failwith "Unknown state"
// We will use eskv, my event store and key value store
// for educational purpose.
// you can install the eskv with the following command:
// $ dotnet tool install eskv --global
// you can then run it by typing
// $ eskv
// launch your favorit browser on http://localhost:5000 to access the UI.
// We load the eskv.client nuget (https://www.nuget.org/packages/eskv.client)
#r "nuget: eskv.client"
open eskv
// and instantiate a client, it uses eskv http API
let client = EskvClient()
module ``Load and save state in Key Value store`` =
// here we will load and save state on each call
// load the state for the given id, which is the key in the key value store
let load id =
let result = client.TryLoad(id)
if result.KeyExists then
State.deserialize result.Value
else
initialState
// save the state in the key value store under specified key
let save id state =
client.Save(id, State.serialize state)
// here, we create a function that captures the identity
// it loads the state from key value store,
// use decide function with the state and the command
// to get events,
// compute new state from current state and events,
// and finally save it.
let bulb id =
fun cmd ->
let state = load id
let events = decide cmd state
let newState = List.fold evolve state events
save id newState
events
// This version is in last write wins mode.
// the time between load and save should be very short, but
// a race with a concurrent execution can happen.
// Under high load with load balancing, this version
// can be called on 2 instances at the same time with two different commands.
// Both load the same state, compute events and a new state.
// One of them write first, but the second overwrite the state...
// However, this is how most systems are written today, continue like this
// if you have low risk of concurrency
// sample commands to test it
let b1 = bulb "b1"
b1 (Build 5)
b1 SwitchOn
b1 SwitchOff
module ``Load and save state in key value store with optimistic conflict detection`` =
// Under high load, with a high risk of concurrency, we can protect
// against concurrent write.
// For this, eskv returns a document ETag when loading.
// This ETag can be passed when saving to check that the document has
// not been modified inbetween.
// A null ETag is used to check that the key has not been created yet.
// load ETag and state from eskv, or use null ETag and initial state
let load id =
let result = client.TryLoad(id)
if result.KeyExists then
(result.ETag, State.deserialize result.Value)
else
(null, initialState)
// try to save state with checking the ETag has not changed
// eskv TrySave method returns null if the operation fails.
let trySave id etag state =
client.TrySave(id, State.serialize state, etag) <> null
// this version simply raise an exception if a conflict is detected
// the caller can retry the command later.
let bulb id =
fun cmd ->
let etag, state = load id
let events = decide cmd state
let newState = List.fold evolve state events
if trySave id etag newState then
events
else
failwith "⚡ Conflict"
// sample commands for test
let b2 = bulb "b2"
b2 (Build 5)
b2 SwitchOn
b2 SwitchOff
module ``Load and save state in key value store with optimistic conflict detection and retry`` =
// in some cases, instead of letting the caller retry, we can retry the command on the
// new state ourself.
// load and try ar the same as the previous version
let load id =
let result = client.TryLoad(id)
if result.KeyExists then
(result.ETag, State.deserialize result.Value)
else
(null, initialState)
let trySave id etag state =
client.TrySave(id, State.serialize state, etag) <> null
// here we use a recursive loop function
// if the trySave function succeeds, it returns the events
// otherwise it call itself to reload state and try again from this fresh state
let bulb id =
let rec loop cmd =
let etag, state = load id
let events = decide cmd state
let newState = List.fold evolve state events
if trySave id etag newState then
events
else
loop cmd
loop
// retrying automatically is not always the best idea.
// if the caller is taking risky actions, it can be better to
// fail and inform them that the situation has changed.
// sample commands for test
let b3 = bulb "b3"
b3 (Build 5)
b3 SwitchOn
b3 SwitchOff
module ``Load and save, keep state in memory with optimistic concurrency and retry`` =
// To avoid reloading state on each call it can be faster to keep it in memory
// we will use a structur that contains ETag and State in a mutable variable.
type EtagState =
{ Etag: string
State: State}
// Same as before, but returns the result in this structure
let load id =
let result = client.TryLoad(id)
if result.KeyExists then
{ Etag = result.ETag; State = State.deserialize result.Value }
else
{ Etag = null; State = initialState}
let trySave id etag state =
client.TrySave(id, State.serialize state, etag)
let bulb id =
// on creation, load etag and state
let mutable etagState = load id
// the recursive loop to retry the command using state from mutable variable
let rec loop cmd =
// decide and compute new state
let events = decide cmd etagState.State
let newState = List.fold evolve etagState.State events
// try to save by checking the etag associated to last known state
match trySave id etagState.Etag newState with
| null ->
// the state has changed in the db, reload it and retry
etagState <- load id
loop cmd
| newEtag ->
// the state has been saved, update the in memory mutable variable
etagState <- { Etag = newEtag; State = newState }
events
loop
// this version is faster only if commands tend to be routed to the same instance
// otherwhise, most of the time the first call to trySave will fail, and a reload
// will be necessary any way.
// you can place your service behind a proxy using sticky session to send commands
// to the same instance. However, if a command is routed to a different instance,
// this code is safe. It will first try to save, but fail, the reload and succeed,
// so the result is always correct.
// sample commands for tests
let b4 = bulb "b4"
b4 (Build 5)
b4 SwitchOn
b4 SwitchOff
// In each of the version above, it is possible to additionaly save
// the event for auditing
// Events are usualy persisted using a EventType and additional data
// Here, only the Built event has extra data.
module Event =
let serialize (event: Event) =
match event with
| Built uses -> "Built", string uses
| SwitchedOn -> "SwitchedOn", ""
| SwitchedOff -> "SwitchedOff", ""
| Broke -> "Broke", ""
// deserialize returns a list of events. Usualy this list contains
// a single event, but returning a list, enable to discard events with an empty list
// or to split a saved event as two domain event when the design changed.
let deserialize (typ: string, data: string) =
match typ with
| "Built" -> [Built (int data)]
| "SwitchedOn" -> [SwitchedOn]
| "SwitchedOff" -> [SwitchedOff]
| "Broke" -> [ Broke ]
| _ -> []
module ``Event sourcing`` =
// this is our first version that is event sourced
// this function loads all events from the start
// and deserialize them
let load id =
let result = client.ReadStreamForward(id, 0)
[ for e in result.Events do
yield! Event.deserialize (e.EventType, e.Data) ] // returns all events from deserialize in the result list
// serialize the event and append them to stream
let append id events =
let data =
[ for e in events do
let typ, data = Event.serialize e
{ EventType = typ; Data = data } ]
client.Append(id, data)
// This function loads all events, fold them to compute state
// compute new events using decide, and append them at the end of the stream
let bulb id =
fun cmd ->
let pastEvents = load id
let state = List.fold evolve initialState pastEvents
let events = decide cmd state
append id events
events
// calling fold even with many event is super fast, as the evolve function
// should be small.
// For a very large number of events, the reading and deserialization will
// be far longer than the fold.
let b1 = bulb "b1"
b1 (Build 5)
b1 SwitchOn
b1 SwitchOff
module ``Event sourcing with optimistic concurrency check`` =
// in the previous version, no concurreny check is done.
// this is not always a problem depending on concurrency risk and the domain,
// but when it is, it is possible to use the last event number (expected version)
// for concurrency checks.
// this version of load, returns the expectedVersion of the stream corresponding to last event
let load id =
let result = client.ReadStreamForward(id, 0)
let events =
[ for e in result.Events do
yield! Event.deserialize (e.EventType, e.Data) ]
result.ExpectedVersion, events
// tryAppend succeed only if the last event in the stream is still at expected version
let tryAppend id expectedVersion events =
let data =
[ for e in events do
let typ, data = Event.serialize e
{ EventType = typ; Data = data } ]
client.TryAppend(id, expectedVersion, data).Success
// here, we get version when loading, and pass it when trying to append
// to check that no events have been appended in the same time
let bulb id =
fun cmd ->
let version, loadedEvents = load id
let state = List.fold evolve initialState loadedEvents
let events = decide cmd state
if tryAppend id version events then
events
else
failwith "⚡ conflict"
let b2 = bulb "b2"
b2 (Build 5)
b2 SwitchOn
b2 SwitchOff
module ``Event sourcing with optimistic concurrency check and retry`` =
// this version implements a retry
// load is the same as in previous version
let load id =
let result = client.ReadStreamForward(id, 0)
let events =
[ for e in result.Events do
yield! Event.deserialize (e.EventType, e.Data) ]
result.ExpectedVersion, events
// this version is slightly different as it uses TryAppendOrRead,
// when the exepectedVersion is correct, call succeed, it simply returns Ok
// but when expectedVersion doesn't match, we directly receive events appended to
// the stream since this version.
// We return them and the exepected version in an Error result
let tryAppendOrRead id expectedVersion events =
let data =
[ for e in events do
let typ, data = Event.serialize e
{ EventType = typ; Data = data } ]
let result = client.TryAppendOrRead(id, expectedVersion, data)
if result.Success then
Ok()
else
let newEvents =
[ for e in result.NewEvents do
yield! Event.deserialize (e.EventType, e.Data) ]
Error(result.ExpectedVersion, newEvents)
// like with state, we use a recursive function
// this time we pass version an state as parameter to avoid having to reload...
let bulb id =
fun cmd ->
let rec loop version state cmd =
// use the decide function to get new events
let events = decide cmd state
// try to append events to stream
match tryAppendOrRead id version events with
| Ok() ->
// it succeeded, return events
events
| Error(newVersion, newEvents) ->
// it failed, but we have the events we missed
// we fold them to get the last known state before retrying
let newState = List.fold evolve state newEvents
loop newVersion newState cmd
// this is the entry point of the functino,
// it loads events and version and compute the state
// then start the loop
let version, loadedEvents = load id
let state = List.fold evolve initialState loadedEvents
loop version state cmd
// this version avoid errors due to conflict. But has explained before
// this depends on the domain.
let b3 = bulb "b3"
b3 (Build 5)
b3 SwitchOn
b3 SwitchOff
module ``Event sourcing with optimistic concurreny check and retry with in memory state`` =
// this last version keeps state in memory
// this is the same as previous version
let load id =
let result = client.ReadStreamForward(id, 0)
let events =
[ for e in result.Events do
yield! Event.deserialize (e.EventType, e.Data) ]
result.ExpectedVersion, events
// this one is different, it return the expectedVersion also on success,
// as it has changed due to newly appended events
let tryAppendOrRead id expectedVersion events =
let data =
[ for e in events do
let typ, data = Event.serialize e
{ EventType = typ; Data = data } ]
let result = client.TryAppendOrRead(id, expectedVersion, data)
if result.Success then
Ok result.ExpectedVersion
else
let newEvents =
[ for e in result.NewEvents do
yield! Event.deserialize (e.EventType, e.Data)]
Error(result.ExpectedVersion, newEvents)
// we use this structure to maintain state and version
type StateVersion =
{ State: State
Version: int }
let bulb id =
// load version and events, and compute state in a mutable variable
let version, loadedEvents = load id
let mutable stateVersion =
{ State = List.fold evolve initialState loadedEvents
Version = version }
fun cmd ->
let rec loop cmd =
// state is already in the stateVersion variable,
// so just call decide with it
let events = decide cmd stateVersion.State
// try to save events
match tryAppendOrRead id stateVersion.Version events with
| Ok newVersion ->
// it succeeded, we store new state and version in the mutable variable
stateVersion <-
{ State = List.fold evolve stateVersion.State events
Version = newVersion}
events
| Error(newVersion, newEvents) ->
// if failed, but we get missed events and the new version
// we can compute new state by folding missed events
// and retry
stateVersion <-
{ State = List.fold evolve stateVersion.State newEvents
Version = newVersion }
loop cmd
loop cmd
// When there is no conflict, this version only call tryAppendOrRead for a command
// and is ready to take the next one. This is really fast.
let b4 = bulb "b4"
b4 (Build 5)
b4 SwitchOn
b4 SwitchOff
// I still advise to take the simplest version that works for you.
// start with load/save state (and maybe log events), if you don't need more
// use event sourcing once you get comfortable with it, and need more advanced scenarios.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment