Skip to content

Instantly share code, notes, and snippets.

@jjvdangelo
Last active August 29, 2015 13:56
Show Gist options
  • Save jjvdangelo/9340630 to your computer and use it in GitHub Desktop.
Save jjvdangelo/9340630 to your computer and use it in GitHub Desktop.
module Counter =
type State = { Value: int }
static member Zero = { Value: 0 }
type Command =
| Incremement
| Decrement
type Event =
| Incremented
| Decremented
let apply state =
function
| Incremented ->
{ Value = state.Value + 1 }
| Decremented ->
{ Value = state.Value - 1 }
let handle state =
function
| Increment ->
// Do our domain logic in here
Incremented
| Decrement ->
// ...and in here
Decremented
module CounterHandler =
open NEventStore
let zero = Counter.State.Zero
let createHandler (bus: IStoreEvents) cmd =
let { Data = data; Body = body } = cmd // Decompose our little command record
match body with
| :? Counter.Command as command ->
use stream = bus.OpenStream(data.EntityId, 0, Int32.MaxValue)
let state =
stream.CommittedEvents // Grab the alread-fired events
|> Seq.map (fun msg -> msg.Body) // Get the actual event from NEventStore
|> Seq.cast<Counter.Event> // Cast them to our expected event type
|> Seq.fold Counter.apply zero // And then do a left-fold of the events with
// our apply function to get the last known
// state of our entity
// Pass the command to the domain using the
// current state to get an event
let e = cmd |> Counter.handle state
// Turn the event into a message for NEventBus
let msg = EventMessage(Body = e)
// Add it to the stream
msg |> stream.Add
// And commit the changes
stream.CommitChanges(Guid.NewGuid())
| _ -> () // Ignore commands we don't understand
module EventBusStartup =
open NEventStore
open NEventStore.Persistence.InMemoryPersistence
open NEventStore.Persistence.SqlPersistence.SqlDialects
// ...
let createEventStore handlers =
let dispatcher = createDispatcher handlers
Wireup.Init()
.LogToConsoleWindow()
.UsingInMemoryPersistence()
.UsingSqlPersistence("default")
.WithDialect(MsSqlDialect())
.UsingCustomSerialization(Serializer.createSerializer())
.Compress()
.UsingAsynchronousDispatchScheduler()
.DispatchTo(dispatcher)
.Build()
module EventBusStartup =
open NEventStore
open NEventStore.Dispatcher
// ...
let createDispatcher handlers =
{ new IDispatchCommits with
member __.Dispatch commit =
handlers |> List.iter (fun h -> h commit.StreamId commit.Events)
member __.Dispose() = () }
open Simple.Web.StructureMap
type DomainStartup() =
inherit StructureMapStartupBase()
let eventBus =
[ MyDomain.createEventHandler1;
MyDomain.createEventHandler2;
MyDomain.createEventHandler3; ]
|> EventBusStartup.createEventStore
let commandRouter =
[ MyDomain.createHandler1;
MyDomain.createHandler2;
MyDomain.createHandler3; ]
|> List.map (fun handler -> handler bus) // Apply the bus to our command handler functions
|> CommandBus.createCommandRouter
override __.Configure(config) =
config.For(typedefof<IRouter<Command>>).Singleton().Use(commandRouter) |> ignore
module EventBusStartup =
open NEventStore
open NEventStore.Dispatcher
open NEventStore.Persistence.InMemoryPersistence
open NEventStore.Persistence.SqlPersistence.SqlDialects
[<RequireQualifiedAccess>]
module Serializer =
open FsPickler
open NEventStore.Serialization
let createSerializer() =
let pickler = FsPickler()
{ new ISerialize with
member __.Serialize<'T> (output, graph: 'T) =
pickler.Serialize(output, graph)
member __.Deserialize<'T> input =
pickler.Deserialize<'T>(input) }
let createDispatcher handlers =
{ new IDispatchCommits with
member __.Dispatch commit =
handlers |> List.iter (fun h -> h commit.StreamId commit.Events)
member __.Dispose() = () }
let createEventStore handlers =
let dispatcher = createDispatcher handlers
Wireup.Init()
.LogToConsoleWindow()
.UsingInMemoryPersistence()
.UsingSqlPersistence("default")
.WithDialect(MsSqlDialect())
.UsingCustomSerialization(Serializer.createSerializer())
.Compress()
.UsingAsynchronousDispatchScheduler()
.DispatchTo(dispatcher)
.Build()
module CounterEventHandler =
open NEventStore
let handle id =
function
| Counter.Incremented ->
// Increment our projection
| Counter.Decremented ->
// Decrement our projection
let createEventHandler id (evts: ResizeArray<EventMessage>) =
evts
|> Seq.map (fun msg -> msg.Body)
|> Seq.iter (fun e ->
match box e with
| :? Counter.Event as e ->
e |> handle id
| _ -> ()) // Ignore events we don't care about
open Simple.Web.StructureMap
type DomainStartup() =
inherit StructureMapStartupBase()
let eventBus =
[ CounterEventHandler.createEventHandler; ] // Put all the event handler functions in this list
|> EventBusStartup.createEventStore
let commandRouter =
[ CounterHandler.createCommandHandler; ] // ...and the command handler functions in this list
|> List.map (fun handler -> handler bus) // Apply the bus to our command handler functions
|> CommandBus.createCommandRouter
override __.Configure(config) =
config.For(typedefof<IRouter<Command>>).Singleton().Use(commandRouter) |> ignore
module EventBusStartup =
open NEventStore
[<RequireQualifiedAccess>]
module Serializer =
open FsPickler
open NEventStore.Serialization
let createSerializer() =
let pickler = FsPickler()
{ new ISerialize with
member __.Serialize<'T> (output, graph: 'T) =
pickler.Serialize(output, graph)
member __.Deserialize<'T> input =
pickler.Deserialize<'T>(input) }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment