Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Dump of ProtoBuf serialization spike for http://github.com/bartelink/FunDomain
module FunDomain.Persistence.NEventStore.NesGateway
open FunDomain.Persistence.Serialization
open NEventStore
open NEventStore.Persistence
open NEventStore.Persistence.Sql.SqlDialects
open Microsoft.FSharp.Reflection
open System
open System.Collections.Generic
/// Opaque token yielded by Streamer.read and consumed by Streamer.append
type Token = { CommitSequence : int; StreamRevision : int}
/// Identifier of a stream in NEventStore
type StreamId = { Bucket: string option; StreamId: string}
type ProtobufSerializer() =
interface NEventStore.Serialization.ISerialize with
member x.Deserialize<'T> input =
ProtoBuf.Serializer.Deserialize<'T>(input)
member x.Serialize<'T>(output, graph) =
ProtoBuf.Serializer.Serialize<'T>(output, graph)
module ProtoBufAdapter =
open ProtoBuf
open ProtoBuf.Meta
let deserializeUnion<'a> (data:byte[]) =
Serializer.Deserialize<'a>(new IO.MemoryStream(data))
let serializeUnion (o:'a) =
use stream = new IO.MemoryStream()
Serializer.Serialize<'a>(stream, o)
stream.ToArray()
let registerSerializableDuInModel<'TMessage> (model:RuntimeTypeModel) =
let baseType = model.[typeof<'TMessage>]
for case in typeof<'TMessage> |> FSharpType.GetUnionCases do
let caseType = case.Name |> case.DeclaringType.GetNestedType
baseType.AddSubType(1000 + case.Tag, caseType) |> ignore
let caseTypeModel = model.[caseType]
caseTypeModel.Add("item").UseConstructor <- false
baseType.CompileInPlace()
let registerSerializableDu<'TMessage> () = registerSerializableDuInModel<'TMessage> RuntimeTypeModel.Default
/// Wrapper yielded by create* functions with create/append functions matching FunDomain.CommandHandler requirements
type StreamerProtobuf private (inner') =
// Hoop jumping a la C++ pimpl pattern - if we don't do this, we're foisting an NEventStore package reference on all downstream users
let inner : IPersistStreams = unbox inner'
let defaultBucket bucketId = defaultArg bucketId "default"
let load {Bucket=bucketId; StreamId=streamId} minRevision maxRevision =
inner.GetFrom(bucketId |> defaultBucket, streamId, minRevision, maxRevision)
let commit = inner.Commit >> ignore
let readStream streamId startIndex count =
let minRevision,maxRevision = startIndex,startIndex+count-1
async {
let commits =
load streamId minRevision maxRevision
|> Array.ofSeq
let events =
commits
|> Seq.collect (fun ev -> ev.Events)
|> Seq.map (fun em -> em.Body |> unbox |> ProtoBufAdapter.deserializeUnion)
|> List.ofSeq
let tokenOption =
if commits.Length = 0 then
None
else
let lastCommit = commits |> Seq.last
Some {CommitSequence=lastCommit.CommitSequence; StreamRevision=lastCommit.StreamRevision}
return events, tokenOption, None }
let appendToStream {Bucket=bucketId; StreamId=streamId} streamMeta token events =
let commitId,commitStamp,commitHeaders = streamMeta
async {
let eventMessages =
events |> Seq.map (fun event ->
let body = event |> ProtoBufAdapter.serializeUnion |> box
EventMessage(Body=body))
let updatedStreamRevision=token |> Option.map (fun token -> token.StreamRevision+1)
let updatedCommitSequence=token |> Option.map (fun token -> token.CommitSequence+1)
let attempt =
CommitAttempt(
bucketId |> defaultBucket, streamId,
updatedStreamRevision |> defaultArg <| 1,
commitId,
updatedCommitSequence |> defaultArg <| 1,
commitStamp,
commitHeaders,
eventMessages)
commit attempt }
static member internal wrap persister = StreamerProtobuf( box persister)
member this.read stream =
let events,version,_ =
readStream stream 0 Int32.MaxValue
|> Async.RunSynchronously
version,events
member this.append stream token events =
let commitMetadata() =
let commitId = Guid.NewGuid()
let commitDateTime = DateTime.UtcNow
let commitHeaders = null
commitId,commitDateTime,commitHeaders
let metadata = commitMetadata()
appendToStream stream metadata token events
|> Async.RunSynchronously
let createFromStoreProtobuf (inner:IStoreEvents) =
inner.Advanced |> StreamerProtobuf.wrap
let createInMemoryProtobuf () =
let serializer = ProtobufSerializer()
Wireup.Init()
.LogToOutputWindow()
.UsingInMemoryPersistence()
.UsingCustomSerialization( serializer)
.Build()
|> createFromStoreProtobuf
module Scenarios
open FunUno.UnoGame // Commands, replay, handle
open FunUno.UnoGame.Events // Digit
open FunDomain // CommandHandler
open FunDomain.Persistence.NEventStore.NesGateway // createInMemory, StreamId
open Xunit
let fullGameActions gameId = [
StartGame { GameId=gameId; PlayerCount=4; TopCard=Digit(3, Red) }
PlayCard { GameId=gameId; Player=0; Card=Digit(3, Blue) }
PlayCard { GameId=gameId; Player=1; Card=Digit(8, Blue) }
PlayCard { GameId=gameId; Player=2; Card=Digit(8, Yellow) }
PlayCard { GameId=gameId; Player=3; Card=Digit(4, Yellow) }
PlayCard { GameId=gameId; Player=0; Card=Digit(4, Green) } ]
let streamId gameId = {Bucket=None; StreamId=gameId |> string}
let [<Fact>] ``Can run a full round using NEventStore's InMemoryPersistence Protobuf`` () =
let domainHandler = CommandHandler.create replay handle
let store = createInMemoryProtobuf()
let persistingHandler = domainHandler store.read store.append
let gameId = 42
let stream = streamId gameId
for action in fullGameActions gameId do
printfn "Processing %A against Stream %A" action stream
action |> persistingHandler stream
let [<Fact>] warmup () =
ProtoBufAdapter.registerSerializableDu<FunUno.UnoGame.Event> ()
ProtoBufAdapter.registerSerializableDu<FunUno.UnoGame.Events.Card> ()
ProtoBufAdapter.registerSerializableDu<FunUno.UnoGame.Events.Color> ()
``Can run a full round using NEventStore's InMemoryPersistence Protobuf``()
``Can run a full round using NEventStore's InMemoryPersistence`` ()
module FunDomain.Tests.ProtobufNetSerialization
open ProtoBuf
open ProtoBuf.Meta
open Swensen.Unquote
open Xunit
open System.IO
open Microsoft.FSharp.Reflection
[<ProtoContract; CLIMutable>]
type MessageA = {
[<ProtoMember(1)>] X: string;
[<ProtoMember(2)>] Y: int option;
}
[<ProtoContract>]
[<CLIMutable>]
type MessageB = {
[<ProtoMember(1)>] A: string;
[<ProtoMember(2)>] B: string;
}
[<ProtoContract>]
type Message =
| MessageA of MessageA
| MessageB of MessageB
let serialize msg =
use ms = new MemoryStream()
Serializer.SerializeWithLengthPrefix(ms, msg, PrefixStyle.Fixed32)
ms.ToArray()
let deserialize<'TMessage> bytes =
use ms = new MemoryStream(buffer=bytes)
Serializer.DeserializeWithLengthPrefix<'TMessage>(ms, PrefixStyle.Fixed32)
let registerSerializableDuInModel<'TMessage> (model:RuntimeTypeModel) =
let baseType = model.[typeof<'TMessage>]
for case in typeof<'TMessage> |> FSharpType.GetUnionCases do
let caseType = case.Name |> case.DeclaringType.GetNestedType
baseType.AddSubType(1000 + case.Tag, caseType) |> ignore
let caseTypeModel = model.[caseType]
caseTypeModel.Add("item").UseConstructor <- false
baseType.CompileInPlace()
let registerSerializableDu<'TMessage> () = registerSerializableDuInModel<'TMessage> RuntimeTypeModel.Default
registerSerializableDu<Message> ()
let [<Fact>] ``MessageA roundtrips with null`` () =
let msg = {X=null; Y=None}
let result = serialize msg
test <@ msg = deserialize result @>
let [<Fact>] ``MessageA roundtrips with Empty`` () =
let msg = {X=""; Y=None}
let result = serialize msg
test <@ msg = deserialize result @>
let [<Fact>] ``MessageA roundtrips with Some`` () =
let msg = {X="foo"; Y=Some 32}
let result = serialize msg
test <@ msg = deserialize result @>
let [<Fact>] ``MessageA roundtrips with None`` () =
let msg = {X="foo"; Y=None}
let result = serialize msg
test <@ msg = deserialize result @>
let [<Fact>] ``MessageB roundtrips`` () =
let msg = {A="bar"; B="baz"}
let result = serialize msg
test <@ msg = deserialize result @>
let [<Fact>] ``roundtrip pair``() =
let msg1 = MessageA {X="foo"; Y=Some 32}
let msg1' = msg1 |> serialize |> deserialize
test <@ msg1' = msg1 @>
let msg2 = MessageB {A="bar"; B="baz"}
let msg2' = msg2 |> serialize |> deserialize
test <@ msg2' = msg2 @>
let [<Fact>] many() =
for _ in 1..1000 do
``roundtrip pair``()
module FunDomain.Tests.ProtobufNetSerializationFacts
open ProtoBuf
open ProtoBuf.Meta
open Swensen.Unquote
open Xunit
open System.IO
open Microsoft.FSharp.Reflection
[<ProtoContract; CLIMutable>]
type MessageA = {
[<ProtoMember(1)>] X: string;
[<ProtoMember(2)>] Y: int option;
}
[<ProtoContract>]
[<CLIMutable>]
type MessageB = {
[<ProtoMember(1)>] A: string;
[<ProtoMember(2)>] B: string;
}
[<ProtoContract>]
type Message =
| MessageA of MessageA
| MessageB of MessageB
let serialize msg =
use ms = new MemoryStream()
Serializer.SerializeWithLengthPrefix(ms, msg, PrefixStyle.Fixed32)
ms.ToArray()
let deserialize<'TMessage> bytes =
use ms = new MemoryStream(buffer=bytes)
Serializer.DeserializeWithLengthPrefix<'TMessage>(ms, PrefixStyle.Fixed32)
let registerSerializableDuInModel<'TMessage> (model:RuntimeTypeModel) =
let baseType = model.[typeof<'TMessage>]
for case in typeof<'TMessage> |> FSharpType.GetUnionCases do
let caseType = case.Name |> case.DeclaringType.GetNestedType
baseType.AddSubType(1000 + case.Tag, caseType) |> ignore
let caseTypeModel = model.[caseType]
caseTypeModel.Add("item").UseConstructor <- false
baseType.CompileInPlace()
let registerSerializableDu<'TMessage> () = registerSerializableDuInModel<'TMessage> RuntimeTypeModel.Default
registerSerializableDu<Message> ()
let [<Fact>] ``MessageA roundtrips with null`` () =
let msg = {X=null; Y=None}
let result = serialize msg
test <@ msg = deserialize result @>
let [<Fact>] ``MessageA roundtrips with Empty`` () =
let msg = {X=""; Y=None}
let result = serialize msg
test <@ msg = deserialize result @>
let [<Fact>] ``MessageA roundtrips with Some`` () =
let msg = {X="foo"; Y=Some 32}
let result = serialize msg
test <@ msg = deserialize result @>
let [<Fact>] ``MessageA roundtrips with None`` () =
let msg = {X="foo"; Y=None}
let result = serialize msg
test <@ msg = deserialize result @>
let [<Fact>] ``MessageB roundtrips`` () =
let msg = {A="bar"; B="baz"}
let result = serialize msg
test <@ msg = deserialize result @>
let [<Fact>] ``roundtrip pair``() =
let msg1 = MessageA {X="foo"; Y=Some 32}
let msg1' = msg1 |> serialize |> deserialize
test <@ msg1' = msg1 @>
let msg2 = MessageB {A="bar"; B="baz"}
let msg2' = msg2 |> serialize |> deserialize
test <@ msg2' = msg2 @>
let [<Fact>] many() =
for _ in 1..1000 do
``roundtrip pair``()
namespace FunUno
open ProtoBuf
module UnoGame =
module Events =
type [<ProtoContract; CLIMutable>] GameStartedEvent = {
[<ProtoMember(1)>] GameId: int;
[<ProtoMember(2)>] PlayerCount:int;
[<ProtoMember(3)>] TopCard: Card}
and [<ProtoContract; CLIMutable>] CardPlayedEvent = {
[<ProtoMember(1)>] GameId: int;
[<ProtoMember(2)>] Player:int;
[<ProtoMember(3)>] Card: Card}
and [<ProtoContract>] Color =
| Red
| Green
| Blue
| Yellow
and [<ProtoContract>] Card =
| Digit of DigitCard
| Kickback of KickbackCard
[<ProtoContract>]
type Event =
| GameStarted of Events.GameStartedEvent
| CardPlayed of Events.CardPlayedEvent
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.