Skip to content

Instantly share code, notes, and snippets.

@adymitruk
Last active January 6, 2021 02:39
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save adymitruk/92be6a19656d68070bc041e9e1e1509a to your computer and use it in GitHub Desktop.
Save adymitruk/92be6a19656d68070bc041e9e1e1509a to your computer and use it in GitHub Desktop.
simple event store in #0tech c#
using System;
using System.Collections.Generic;
using System.Configuration;
using System.IO;
using System.Text;
using System.Threading;
using System.Xml;
using System.Xml.Serialization;
using System.Linq;
using domain;
using Messages;
namespace TransactionStorage
{
public static class StringExt { public static string Reverse(this string orig) { return orig.Aggregate("", (x, y) => y + x); } }
public static class Storage {
private static readonly string _path;
private const int LongSize = sizeof (long);
private const int GUIDSize = 16;
static Storage() { _path = ConfigurationManager.AppSettings["path"]; }
public static IHistory GetAggregate<T>(string aggregateId) where T : new() {
var history = (IHistory)new T();
var events = RetrieveFor(aggregateId).ToList();
events.ForEach(history.Replay);
if (events.Count > 10) Add(history.GenerateSnapshot(aggregateId));
return history;
}
public static void Add(Event @event) {
string s = @event.GUID.Reverse();
string filename;
string directory = GetDirectoryAndFileNameForEvents(s, out filename);
if (!Directory.Exists(_path + Path.DirectorySeparatorChar + directory)) Directory.CreateDirectory(_path + Path.DirectorySeparatorChar + directory);
string serializedAggregate;
var xws = new XmlWriterSettings {OmitXmlDeclaration = true};
var ns = new XmlSerializerNamespaces();
ns.Add("", "");
var sb = new StringBuilder();
using (var xmlw = XmlWriter.Create(sb, xws)) {
new XmlSerializer(@event.GetType()).Serialize(xmlw, @event, ns);
serializedAggregate = sb.ToString(); }
sb = new StringBuilder();
var mutex = new Mutex(false, @event.GUID);
mutex.WaitOne();
try {
using (var fs = File.Open(_path + Path.DirectorySeparatorChar + "eventindex", FileMode.Append, FileAccess.Write, FileShare.None)) {
try {
using(var writer = XmlWriter.Create(sb, xws)) {
writer.WriteStartElement("wrapper");
writer.WriteElementString("type", @event.GetType().AssemblyQualifiedName);
writer.WriteStartElement("event");
writer.WriteRaw(serializedAggregate);
writer.WriteEndElement();
writer.WriteEndElement(); }
byte[] contentBytes = Encoding.Default.GetBytes(sb.ToString());
byte[] lengthBytes = BitConverter.GetBytes(contentBytes.Length);
long position;
using (var writer = new FileStream(filename, FileMode.Append)) {
try {
position = writer.Position;
writer.Write(lengthBytes, 0, 4);
writer.Write(contentBytes, 0, contentBytes.Length); }
finally { writer.Close(); } }
if (@event is Snapshot) {
GetDirectoryAndFileNameForSnapshots(s, out filename);
using (var writer = new FileStream(filename, FileMode.OpenOrCreate)) {
try {
writer.Write(BitConverter.GetBytes(position), 0, LongSize); }
finally { writer.Close(); } } }
fs.Write(new Guid(@event.GUID).ToByteArray(), 0, GUIDSize);
fs.Write(BitConverter.GetBytes(position), 0, LongSize); }
finally { fs.Close(); } } }
finally { mutex.ReleaseMutex(); }
}
private static string GetDirectoryAndFileNameForEvents(string s, out string filename) { return GetFileName(s, out filename, ".history"); }
private static void GetDirectoryAndFileNameForSnapshots(string s, out string filename) { GetFileName(s, out filename, ".snapshots"); return; }
private static string GetFileName(string s, out string filename, string extension) {
filename = _path + Path.DirectorySeparatorChar + s.Insert(2, Path.DirectorySeparatorChar + "") + extension;
return s.Substring(0, 2);}
public static IEnumerable<Event> RetrieveFor(string id) {
var s = id.Reverse();
string filename;
var offset = GetStartingIndexFor(s);
GetDirectoryAndFileNameForEvents(s, out filename);
using (var fs = new FileStream(filename,FileMode.Open,FileAccess.Read,FileShare.Read)) {
try {
fs.Position = offset;
while (true) {
var contentBufferLength = new byte[4];
var read = fs.Read(contentBufferLength, 0, 4);
if (read == 0) break;
if (read < 4) throw new ApplicationException("event index integrity error");
yield return GetStoredEvent(fs, contentBufferLength); } }
finally { fs.Close(); } }
yield break;
}
private static long GetStartingIndexFor(string s) {
string file;
GetDirectoryAndFileNameForSnapshots(s, out file);
if (!File.Exists(file)) return 0;
using (var fs = new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.Read)) {
try {
var contentBufferLength = new byte[4];
if (fs.Read(contentBufferLength, 0, 4) < 4) throw new ApplicationException("snapshot index error");
return BitConverter.ToInt32(contentBufferLength, 0); }
finally { fs.Close(); } }
}
private static Event GetStoredEvent(FileStream fs, byte[] contentLengthBuffer) {
int contentLength = BitConverter.ToInt32(contentLengthBuffer, 0);
var content = new byte[contentLength];
if (fs.Read(content, 0, contentLength) < contentLength) throw new ApplicationException("incomplete event information retrieved");
var xmlDocument = new XmlDocument();
xmlDocument.Load(new StringReader(Encoding.Default.GetString(content, 0, contentLength)));
var serializer = new XmlSerializer(Type.GetType(xmlDocument.SelectSingleNode("wrapper/type").InnerText));
return ((Event) serializer.Deserialize(new StringReader(xmlDocument.SelectSingleNode("wrapper/event").InnerXml)));
}
private static Event RetrieveEvent(Guid id, long indexOffset) {
string s = id.ToString().Reverse();
string filename;
GetDirectoryAndFileNameForEvents(s, out filename);
using (var fs = new FileStream(filename, FileMode.Open, FileAccess.Read, FileShare.Read)) {
fs.Position = indexOffset;
var contentBufferLength = new byte[4];
if (fs.Read(contentBufferLength, 0, 4) < 4) throw new ApplicationException("Event integrity problem");
return GetStoredEvent(fs, contentBufferLength); }
}
public static IEnumerable<Event> RetrieveAll() {
string filename = _path + Path.DirectorySeparatorChar + "eventindex";
using (var fs = new FileStream(filename,FileMode.Open,FileAccess.Read,FileShare.Read)) {
try {
while (true) {
var guidBytes = new byte[GUIDSize];
var offsetBytes = new byte[LongSize];
int readBytes = fs.Read(guidBytes,0, GUIDSize);
if (readBytes==0) break;
if (readBytes!=GUIDSize) throw new ApplicationException("index integrity error");
readBytes = fs.Read(offsetBytes, 0, LongSize);
if (readBytes!=LongSize) throw new ApplicationException("index integrity error");
var guid = new Guid(guidBytes);
long indexOffset = BitConverter.ToInt64(offsetBytes, 0);
yield return RetrieveEvent(guid, indexOffset); } }
finally { fs.Close(); } }
yield break;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment