Skip to content

Instantly share code, notes, and snippets.

@worldspawn
Created June 18, 2015 04:40
Show Gist options
  • Save worldspawn/57be6c61e6fb47d711e2 to your computer and use it in GitHub Desktop.
Save worldspawn/57be6c61e6fb47d711e2 to your computer and use it in GitHub Desktop.
event funk
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Text;
using Newtonsoft.Json;
using Newtonsoft.Json.Serialization;
using NPoco;
using NPoco.FluentMappings;
using Xunit;
namespace TechCompare
{
public static class SystemTime
{
private static Func<DateTime> _utcNow;
public static DateTime UtcNow
{
get
{
if (_utcNow == null)
_utcNow = () => DateTime.UtcNow;
return _utcNow();
}
set
{
_utcNow = () => value;
}
}
}
public class TestWazoo
{
private readonly Database _database;
private EventStore _eventStore;
public TestWazoo()
{
var dbConfig = FluentMappingConfiguration.Scan(s =>
{
s.Assembly(Assembly.GetExecutingAssembly());
s.TablesNamed(type => Inflector.AddUnderscores(type.Name));
s.Columns.Named(memberInfo => Inflector.AddUnderscores(memberInfo.Name));
});
var factory = DatabaseFactory.Config(x =>
{
x.WithFluentConfig(dbConfig);
});
_database =
factory.Build(new Database(
@"Data Source=.\SQL2014;Initial Catalog=techcompare;uid=sa;pwd=password;",
DatabaseType.SqlServer2012));
var config = new EventStoreConfig
{
EventsPerSnapshot = 10
};
_eventStore = new EventStore(_database, config);
}
[Fact]
public void LoadTest()
{
var motherBoard = _eventStore.Load<MotherBoard>(Guid.Parse("1ea61a8a-3080-4447-a853-ed690b5ec262"));
//MotherBoard-
Assert.Equal("mb", motherBoard.Name);
Assert.Equal(64, motherBoard.MaxRam);
Assert.Equal(8, motherBoard.RamSlots);
motherBoard.ChangeSwitchConfig(true, false, true);
Assert.True(motherBoard.OnBoardCmosSwitch);
}
[Fact]
public void Test()
{
var brand = new Brand(Guid.NewGuid(), "brand");
var socket = new Socket(Guid.NewGuid(), "socket", SocketSpecification.LGA2011);
var chipset = new Chipset(Guid.NewGuid(), "chipset");
var motherboard = MotherBoard.Create("mb", brand, socket, chipset, RamType.DDR3, MbProfile.ATX);
motherboard.ChangeRamSlots(8);
motherboard.ChangeMaxRam(64);
_eventStore.AppendToStream(motherboard);
}
}
public class EventStoreConfig
{
public int EventsPerSnapshot { get; set; }
}
public class EventStore
{
private readonly IDatabase _db;
private readonly EventStoreConfig _config;
public EventStore(IDatabase db, EventStoreConfig config)
{
_db = db;
_config = config;
}
public TAggregate Load<TAggregate>(Guid id) where TAggregate : class, IAggregate, new()
{
var obj = GetSnapshot<TAggregate>(id) ?? new TAggregate();
var aggregateType = typeof (TAggregate);
var version = obj.Version;
var streamName = String.Format("{0}-{1}", aggregateType.Name, id);
var events = _db.Query<EventStream>("select * from event_stream where stream_name = @0 and version > @1 order by version asc", streamName, version);
foreach (var ev in events)
{
var meta = Deserialize<EventMetadata>(ev.MetaData);
var eventType = Type.GetType(meta.EventClrName);
var @event = Deserialize(eventType, ev.Event) as IEvent;
obj.RunEvent(@event);
obj.Version = ev.Version;
}
return obj;
}
private TAggregate GetSnapshot<TAggregate>(Guid id) where TAggregate : class
{
var aggregateType = typeof (TAggregate);
var streamName = String.Format("{0}-{1}-snapshot", aggregateType.Name, id);
var snapshot = _db.SingleOrDefault<EventStream>("select top 1 * from event_stream where stream_name = @0 order by version desc", streamName);
return snapshot == null ? default(TAggregate) : Deserialize<TAggregate>(snapshot.Event);
}
private void TakeSnapshot(IAggregate aggregate, long originalVersion)
{
if ((originalVersion / _config.EventsPerSnapshot) < (aggregate.Version / _config.EventsPerSnapshot))
{
var eventData = new EventStream
{
EventId = Guid.NewGuid(),
Name = "Snapshot",
Event = Serialize(aggregate),
MetaData = Serialize(new {}),
StreamName = String.Format("{0}-{1}-snapshot", aggregate.GetType().Name, aggregate.Id),
Version = aggregate.Version
};
_db.Insert(eventData);
}
}
private static IEnumerable<EventStream> EventStreamFactory(IEnumerable<IEvent> events, string streamName, long version)
{
return events.Select(es => new EventStream
{
EventId = Guid.NewGuid(),
Name = es.GetType().Name,
Event = Serialize(es),
MetaData = Serialize(new EventMetadata {EventClrName = es.GetType().AssemblyQualifiedName}),
StreamName = streamName,
Version = ++version
});
}
public void AppendToStream(IAggregate aggregate)
{
if (!aggregate.HasEvents)
{
return;
}
var aggregateType = aggregate.GetType().Name;
var streamName = String.Format("{0}-{1}", aggregateType, aggregate.Id);
var originalVersion = aggregate.Version;
var currentVersion = aggregate.Version;
var events = EventStreamFactory(aggregate.GetEvents(), streamName, currentVersion);
_db.BeginTransaction();
var error = true;
try
{
foreach (var e in events)
{
_db.Insert(e);
currentVersion++;
}
aggregate.Version = currentVersion;
TakeSnapshot(aggregate, originalVersion);
error = false;
}
finally
{
if (error)
{
_db.AbortTransaction();
}
else
{
_db.CompleteTransaction();
}
}
}
protected static JsonSerializer Serializer;
static EventStore()
{
Serializer = JsonSerializer.Create(new JsonSerializerSettings()
{
ContractResolver = new CamelCasePropertyNamesContractResolver()
});
}
private static byte[] Serialize(object data)
{
using (var stream = new MemoryStream())
{
using (var streamWriter = new StreamWriter(stream, Encoding.UTF8))
{
using (var jsonWriter = new JsonTextWriter(streamWriter))
{
Serializer.Serialize(jsonWriter, data);
}
}
return stream.ToArray();
}
}
private static TType Deserialize<TType>(byte[] data) where TType : class
{
return Deserialize(typeof (TType), data) as TType;
}
private static object Deserialize(Type type, byte[] data)
{
using (var stream = new MemoryStream(data, false))
{
using (var streamReader = new StreamReader(stream, Encoding.UTF8))
{
using (var jsonReader = new JsonTextReader(streamReader))
{
return Serializer.Deserialize(jsonReader, type);
}
}
}
}
}
public class EventMetadata
{
public string EventClrName { get; set; }
}
[PrimaryKey("StreamName,Version")]
public class EventStream
{
public string StreamName { get; set; }
public long Version { get; set; }
public Guid EventId { get; set; }
public string Name { get; set; }
public byte[] MetaData { get; set; }
public byte[] Event { get; set; }
}
public interface IEvent
{
Guid? AggregateId { get; set; }
}
public interface IAggregate
{
Guid Id { get; }
long Version { get; set; }
void RunEvent(IEvent @event);
IEnumerable<IEvent> GetEvents();
void ClearEvents();
bool HasEvents { get; }
}
public class Aggregate<TAggregate> : IAggregate
{
public Guid Id { get; protected set; }
public long Version { get; protected set; }
long IAggregate.Version { get { return Version; } set { Version = value; } }
bool IAggregate.HasEvents { get { return _events.Any(); } }
private static readonly Dictionary<Type, MethodInfo> EventMethods;
private readonly IList<IEvent> _events = new List<IEvent>();
private readonly IAggregate _typedReference;
protected Aggregate()
{
_typedReference = this;
}
static Aggregate()
{
var eventType = typeof (IEvent);
var type = typeof(TAggregate);
EventMethods = type.GetMethods(BindingFlags.NonPublic | BindingFlags.Instance)
.Where(m =>
{
var parameters = m.GetParameters();
return m.Name == "OnHandle" && parameters.Length == 1 && eventType.IsAssignableFrom(parameters[0].ParameterType);
})
.ToDictionary(z => z.GetParameters()[0].ParameterType);
}
void IAggregate.RunEvent(IEvent @event)
{
if (@event == null) throw new ArgumentNullException("event");
MethodInfo eventHandler;
if (EventMethods.TryGetValue(@event.GetType(), out eventHandler))
{
eventHandler.Invoke(this, new object[] { @event });
}
}
public IEnumerable<IEvent> GetEvents()
{
return _events.AsEnumerable();
}
public void ClearEvents()
{
_events.Clear();
}
protected void HandleEvent(IEvent @event)
{
if (!@event.AggregateId.HasValue)
{
@event.AggregateId = Id;
}
_typedReference.RunEvent(@event);
_events.Add(@event);
}
}
public abstract class Event : IEvent
{
public Guid? AggregateId { get; set; }
}
public class MotherBoard : Aggregate<MotherBoard>
{
public MotherBoard()
{
PciE = Enumerable.Empty<PciExpressSlot>();
Pci = Enumerable.Empty<PciSlot>();
SliCrossFireSupport = Enumerable.Empty<SliCrossFireSupport>();
SataControllers = Enumerable.Empty<SataController>();
UsbControllers = Enumerable.Empty<UsbController>();
Nics = Enumerable.Empty<Nic>();
GraphicsSupport = new IntegratedGraphicsSupport(Enumerable.Empty<DisplayPort>());
SoundController = new SoundController();
}
protected MotherBoard(Guid id, string name, Brand brand, Socket socket, Chipset chipset, RamType ramType, MbProfile mbProfile)
: this()
{
HandleEvent(new Events.Created
{
AggregateId = id,
Name = name,
Brand = brand,
Socket = socket,
Chipset = chipset,
RamType = ramType,
Profile = mbProfile
});
}
public static MotherBoard Create(string name, Brand brand, Socket socket, Chipset chipset, RamType ramType, MbProfile mbProfile)
{
var mb = new MotherBoard(Guid.NewGuid(), name, brand, socket, chipset, ramType, mbProfile);
return mb;
}
public void ChangeMaxRam(int maxRam)
{
HandleEvent(new Events.ChangedMaxRam
{
MaxRam = maxRam
});
}
public void ChangeRamSlots(int ramSlots)
{
HandleEvent(new Events.ChangedRamSlots
{
RamSlots = ramSlots
});
}
public void ChangeSwitchConfig(bool hasOnboardCmosSwitch, bool hasOnboardPowerSwitch, bool hasOnboardResetSwitch)
{
HandleEvent(new Events.ChangedSwitchConfig
{
HasOnboardCmosSwitch = hasOnboardCmosSwitch,
HasOnboardPowerSwitch = hasOnboardPowerSwitch,
HasOnboardResetSwitch = hasOnboardResetSwitch
});
}
public void AddPciExpress(PciExpressSlot slot)
{
HandleEvent(new Events.AddedPciExpressSlot
{
PciExpressSlot = slot
});
}
public void DeletePciExpress(Guid id)
{
HandleEvent(new Events.DeletedPciExpressSlot
{
Id = id
});
}
public void AddPci(PciSlot slot)
{
HandleEvent(new Events.AddedPciSlot());
}
public void DeletePci(Guid id)
{
HandleEvent(new Events.DeletedPciSlot
{
Id = id
});
}
protected void OnHandle(Events.Created @event)
{
Id = @event.AggregateId.Value;
Name = @event.Name;
BrandId = @event.Brand.Id;
SocketId = @event.Socket.Id;
ChipsetId = @event.Chipset.Id;
RamType = @event.RamType;
Profile = @event.Profile;
}
protected void OnHandle(Events.ChangedMaxRam @event)
{
MaxRam = @event.MaxRam;
}
protected void OnHandle(Events.ChangedRamSlots @event)
{
RamSlots = @event.RamSlots;
}
protected void OnHandle(Events.ChangedSwitchConfig @event)
{
OnBoardCmosSwitch = @event.HasOnboardCmosSwitch;
OnBoardPowerSwitch = @event.HasOnboardPowerSwitch;
OnBoardResetSwitch = @event.HasOnboardResetSwitch;
}
protected void OnHandle(Events.AddedPciExpressSlot @event)
{
PciE = PciE.Concat(new [] { @event.PciExpressSlot });
}
protected void OnHandle(Events.DeletedPciExpressSlot @event)
{
PciE = PciE.Where(x => x.Id != @event.Id);
}
protected void OnHandle(Events.AddedPciSlot @event)
{
Pci = Pci.Concat(new[] {new PciSlot()});
}
protected void OnHandle(Events.DeletedPciSlot @event)
{
Pci = Pci.Where(x => x.Id != @event.Id);
}
protected void OnHandle(Events.AddedSliCrossFireSupport @event)
{
SliCrossFireSupport = SliCrossFireSupport.Concat(new [] { @event.SliCrossFireSupport });
}
protected void OnHandle(Events.DeletedSliCrossFireSupport @event)
{
SliCrossFireSupport = SliCrossFireSupport.Where(x => x.Id != @event.Id);
}
protected void OnHandle(Events.AddedSataController @event)
{
SataControllers = SataControllers.Concat(new[] {@event.SataController});
}
protected void OnHandle(Events.DeletedSataController @event)
{
SataControllers = SataControllers.Where(x => x.Id != @event.Id);
}
protected void OnHandle(Events.AddedUsbController @event)
{
UsbControllers = UsbControllers.Concat(new[] { @event.UsbController });
}
protected void OnHandle(Events.DeletedUsbController @event)
{
UsbControllers = UsbControllers.Where(x => x.Id != @event.Id);
}
protected void OnHandle(Events.ChangedDisplayPorts @event)
{
GraphicsSupport = new IntegratedGraphicsSupport(@event.DisplayPorts);
}
protected void OnHandle(Events.AddedNic @event)
{
Nics = Nics.Concat(new[] { @event.Nic });
}
protected void OnHandle(Events.DeletedNic @event)
{
Nics = Nics.Where(x => x.Id != @event.Id);
}
protected void OnHandle(Events.ChangedSoundOptions @event)
{
SoundController.Supports51 = @event.Supports51;
SoundController.Supports71 = @event.Supports71;
}
protected void OnHandle(Events.ChangedUefiSupport @event)
{
Uefi = @event.HasUefi;
}
public string Name { get; private set; }
public Guid BrandId { get; private set; }
public DateTime ReleaseDate { get; private set; }
public Guid SocketId { get; private set; }
public Guid ChipsetId { get; private set; }
public RamType RamType { get; private set; }
public int MaxRam { get; private set; }
public int RamSlots { get; private set; }
public bool OnBoardPowerSwitch { get; private set; }
public bool OnBoardResetSwitch { get; private set; }
public bool OnBoardCmosSwitch { get; private set; }
public MbProfile Profile { get; private set; }
public IEnumerable<PciExpressSlot> PciE { get; private set; }
public IEnumerable<PciSlot> Pci { get; private set; }
public IEnumerable<SliCrossFireSupport> SliCrossFireSupport { get; private set; }
public IEnumerable<SataController> SataControllers { get; private set; }
public IEnumerable<UsbController> UsbControllers { get; private set; }
public IntegratedGraphicsSupport GraphicsSupport { get; private set; }
public IEnumerable<Nic> Nics { get; private set; }
public SoundController SoundController { get; private set; }
public bool Uefi { get; set; }
public static class Events
{
public class Created : Event
{
public string Name { get; set; }
public Brand Brand { get; set; }
public Socket Socket { get; set; }
public Chipset Chipset { get; set; }
public RamType RamType { get; set; }
public MbProfile Profile { get; set; }
}
public class ChangedMaxRam : Event
{
public int MaxRam { get; set; }
}
public class ChangedRamSlots : Event
{
public int RamSlots { get; set; }
}
public class ChangedSwitchConfig : Event
{
public bool HasOnboardPowerSwitch { get; set; }
public bool HasOnboardResetSwitch { get; set; }
public bool HasOnboardCmosSwitch { get; set; }
}
public class AddedPciExpressSlot : Event
{
public PciExpressSlot PciExpressSlot { get; set; }
}
public class DeletedPciExpressSlot : Event
{
public Guid Id { get; set; }
}
public class AddedPciSlot : Event
{
}
public class DeletedPciSlot : Event
{
public Guid Id { get; set; }
}
public class AddedSliCrossFireSupport : Event
{
public SliCrossFireSupport SliCrossFireSupport { get; set; }
}
public class DeletedSliCrossFireSupport : Event
{
public Guid Id { get; set; }
}
public class AddedSataController : Event
{
public SataController SataController { get; set; }
}
public class DeletedSataController : Event
{
public Guid Id { get; set; }
}
public class AddedUsbController : Event
{
public UsbController UsbController { get; set; }
}
public class DeletedUsbController : Event
{
public Guid Id { get; set; }
}
public class ChangedDisplayPorts : Event
{
public IEnumerable<DisplayPort> DisplayPorts { get; set; }
}
public class AddedNic : Event
{
public Nic Nic { get; set; }
}
public class DeletedNic : Event
{
public Guid Id { get; set; }
}
public class ChangedSoundOptions : Event
{
public bool Supports51 { get; set; }
public bool Supports71 { get; set; }
}
public class ChangedUefiSupport : Event
{
public bool HasUefi { get; set; }
}
}
}
public class Socket : Aggregate<Socket>
{
public Socket(Guid id, string name, SocketSpecification specification)
{
Id = id;
Name = name;
Specification = specification;
}
public string Name { get; private set; }
public SocketSpecification Specification { get; private set; }
}
public class Chipset : Aggregate<Chipset>
{
public Chipset(Guid id, string name)
{
Id = id;
Name = name;
}
public string Name { get; private set; }
}
public class Brand : Aggregate<Brand>
{
public Brand(Guid id, string name)
{
Id = id;
Name = name;
}
public string Name { get; private set; }
}
public class PciExpressSlot
{
public PciExpressSlot()
{
Id = Guid.NewGuid();
}
public Guid Id { get; set; }
public PcieType Type { get; private set; }
}
public class PciSlot
{
public PciSlot()
{
Id = Guid.NewGuid();
}
public Guid Id { get; set; }
}
public class SliCrossFireSupport
{
public SliCrossFireSupport()
{
Id = Guid.NewGuid();
}
public Guid Id { get; set; }
public IEnumerable<PcieType> SupportedConfiguration { get; private set; }
}
public class SataController
{
public SataController()
{
Id = Guid.NewGuid();
}
public Guid Id { get; set; }
public SataType Type { get; private set; }
public int InternalPorts { get; private set; }
public int ExternalPorts { get; private set; }
public Raid RaidSupport { get; private set; }
}
public class UsbController
{
public UsbController()
{
Id = Guid.NewGuid();
}
public Guid Id { get; set; }
public UsbType Type { get; private set; }
public int InternalPorts { get; private set; }
public int ExternalPorts { get; private set; }
}
public class IntegratedGraphicsSupport
{
public IntegratedGraphicsSupport(IEnumerable<DisplayPort> ports)
{
Ports = ports;
}
public IEnumerable<DisplayPort> Ports { get; private set; }
}
public class SoundController
{
public bool Supports51 { get; set; }
public bool Supports71 { get; set; }
}
public class Nic
{
public Nic()
{
Id = Guid.NewGuid();
}
public Guid Id { get; set; }
public bool IsIntel { get; private set; }
public bool SupportsTeaming { get; private set; }
public bool SupportsWol { get; private set; }
public NicType Type { get; private set; }
}
public enum SocketSpecification
{
None = 0,
LGA2011
}
public enum NicType
{
None = 0,
Megabit10,
Megabit100,
Gigabit1,
Gigabit2
}
public enum MbProfile
{
None = 0,
ATX,
eATX,
}
public enum DisplayPort
{
None,
MiniDisplayPort,
VGA,
DVI,
HDMI,
MiniHDMI
}
[Flags]
public enum Raid
{
None = 0,
Raid0 = 1 << 0,
Raid1 = 1 << 1,
Raid2 = 1 << 2,
Raid3 = 1 << 3,
Raid4 = 1 << 4,
Raid5 = 1 << 5,
Raid0_1 = Raid0 | Raid1
}
public enum UsbType
{
None = 0,
USB2,
USB3,
USB3_1
}
public enum SataType
{
None = 0,
Sata,
SataII,
SataIII,
SataM2
}
public enum RamType
{
None = 0,
DDR3,
DDR4,
DDR5
}
public enum PcieType
{
None = 0,
X1,
X2,
X4,
X8,
X16
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment