Skip to content

Instantly share code, notes, and snippets.

@ralfw
Last active November 27, 2019 16:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ralfw/628d2b3ba93825eecde46b08575915e1 to your computer and use it in GitHub Desktop.
Save ralfw/628d2b3ba93825eecde46b08575915e1 to your computer and use it in GitHub Desktop.
DDC 2019 - Event-Orientation Workshop
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using Newtonsoft.Json; // NuGet package dependency
namespace eventorientation
{
public interface IEventstore : IDisposable {
event Action<Version, long, Event[]> OnRecorded;
(Version version, long finalEventNumber) Record(params Event[] events);
(Version version, long finalEventNumber) Record(Version expectedVersion, params Event[] events);
Version Version(string id);
long Length { get; }
Event[] Replay(params Type[] eventTypes);
Event[] Replay(long firstEventNumber, params Type[] eventTypes);
(Version[] versions, Event[] events) ReplayWithVersion(Func<Event,string> mapToVersionId, params Type[] eventTypes);
(Version[] versions, Event[] events) ReplayWithVersion(long firstEventNumber, Func<Event,string> mapToVersionId, params Type[] eventTypes);
}
public abstract class Event {
public string EventId { get; set; }
protected Event() { EventId = Guid.NewGuid().ToString(); }
}
public class Version {
public string Id { get; }
public string Number { get; }
internal Version(string id, string number) {
if (string.IsNullOrWhiteSpace(id)) throw new InvalidOperationException("Id of version must not be empty/null!");
Id = id;
Number = number;
}
public Version(string id) : this(id, "*") {}
}
public class FilesInFolderEventstore : IEventstore
{
private const string DEFAUL_PATH = "eventstore.db";
public event Action<Version, long, Event[]> OnRecorded = (v, f, e) => { };
private readonly Lock _lock;
private readonly FilesInFolderEventRepository _repo;
private readonly Versions _vers;
public FilesInFolderEventstore() : this(DEFAUL_PATH) {}
public FilesInFolderEventstore(string path) {
_repo = new FilesInFolderEventRepository(Path.Combine(path, "events"));
_vers = new Versions(Path.Combine(path, "versions"));
_lock = new Lock();
}
public (Version version, long finalEventNumber) Record(params Event[] events) => Record(null, events);
public (Version version, long finalEventNumber) Record(Version expectedVersion, params Event[] events) {
Version newVersion = null;
long finalEventNumber = -1;
_lock.TryWrite(() => {
newVersion = _vers.Update(expectedVersion);
Store_events();
});
OnRecorded(newVersion, finalEventNumber, events);
return (newVersion, finalEventNumber);
void Store_events() {
var n = _repo.Count;
events.ToList().ForEach(e => _repo.Store(n++, e));
finalEventNumber = _repo.Count;
}
}
public Version Version(string id) => _vers[id];
public long Length => _repo.Count;
public Event[] Replay(params Type[] eventTypes) => Replay(-1, eventTypes);
public Event[] Replay(long firstEventNumber, params Type[] eventTypes)
=> _lock.TryRead(() => {
var allEvents = AllEvents(firstEventNumber);
return Filter(allEvents, eventTypes).ToArray();
});
public (Version[] versions, Event[] events) ReplayWithVersion(Func<Event, string> mapToVersionId, params Type[] eventTypes)
=> ReplayWithVersion(-1, mapToVersionId, eventTypes);
public (Version[] versions, Event[] events) ReplayWithVersion(long firstEventNumber, Func<Event, string> mapToVersionId, params Type[] eventTypes) {
Version[] versions = new Version[0];
var events = _lock.TryRead(() => {
var allEvents = AllEvents(firstEventNumber);
var filteredEvents = Filter(allEvents, eventTypes).ToArray();
versions = Retrieve_versions(filteredEvents).ToArray();
return filteredEvents;
});
return (versions, events);
IEnumerable<Version> Retrieve_versions(IEnumerable<Event> events) {
var idsRetrieved = new HashSet<string>();
foreach (var e in events) {
var versionId = mapToVersionId(e);
if (string.IsNullOrEmpty(versionId)) continue;
if (idsRetrieved.Contains(versionId)) continue;
yield return _vers[versionId];
idsRetrieved.Add(versionId);
}
}
}
private IEnumerable<Event> AllEvents(long firstEventNumber) {
var n = _repo.Count;
for (var i = firstEventNumber < 0 ? 0 : firstEventNumber; i < n; i++)
yield return _repo.Load(i);
}
private IEnumerable<Event> Filter(IEnumerable<Event> events, Type[] eventTypes) {
if (eventTypes.Length <= 0) return events;
var eventTypes_ = new HashSet<Type>(eventTypes);
return events.Where(e => eventTypes_.Contains(e.GetType()));
}
public void Dispose() {
_repo.Dispose();
}
}
internal static class EventSerialization {
public static string Serialize(this Event e) {
var eventName = e.GetType().AssemblyQualifiedName;
var data = JsonConvert.SerializeObject(e);
var parts = new[]{eventName, data};
return string.Join("\n", parts);
}
public static Event Deserialize(this string e) {
var lines = e.Split('\n');
var eventName = lines.First();
var data = string.Join("\n", lines.Skip(1));
return (Event)JsonConvert.DeserializeObject(data, Type.GetType(eventName));
}
}
internal class Versions : IDisposable
{
private readonly string _path;
public Versions(string path) {
_path = path;
if (Directory.Exists(_path) is false)
Directory.CreateDirectory(_path);
}
public Version this[string id] {
get {
var filename = VersionFilename(id);
if (File.Exists(filename) is false)
return new Version(id);
return new Version(
id,
File.ReadAllText(filename));
}
}
public Version Update(Version expectedVersion) {
if (expectedVersion == null) return null;
var filename = VersionFilename(expectedVersion.Id);
if (File.Exists(filename) is false) {
var newVersion = new Version(expectedVersion.Id, "1");
File.WriteAllText(filename, newVersion.Number);
return newVersion;
}
var currentNumber = File.ReadAllText(filename);
if (expectedVersion.Number != "*" && expectedVersion.Number != currentNumber) throw new InvalidOperationException($"Expected version '{expectedVersion.Number}' for {expectedVersion.Id} does not match current '{currentNumber}'!");
currentNumber = (int.Parse(currentNumber) + 1).ToString();
var updatedVersion = new Version(expectedVersion.Id, currentNumber);
File.WriteAllText(filename, updatedVersion.Number);
return updatedVersion;
}
private string VersionFilename(string id) => Path.Combine(_path, id + ".txt");
public void Dispose() {}
}
internal class FilesInFolderEventRepository : IDisposable
{
private readonly string _path;
public FilesInFolderEventRepository(string path) {
_path = path;
if (Directory.Exists(_path) is false)
Directory.CreateDirectory(_path);
}
public void Store(long index, Event e) {
var text = EventSerialization.Serialize(e);
Store(index, text);
}
private void Store(long index, string text) {
if (index < 0) throw new InvalidOperationException("Event index must be >= 0!");
var filepath = FilepathFor(index);
if (File.Exists(filepath)) throw new InvalidOperationException($"Event with index {index} has already been stored and cannot be overwritten!");
File.WriteAllText(filepath, text);
}
public Event Load(long index) {
var text = File.ReadAllText(FilepathFor(index));
return EventSerialization.Deserialize(text);
}
public long Count => Directory.GetFiles(_path).Length;
private string FilepathFor(long index) => System.IO.Path.Combine(_path, $"{index:x16}.txt");
public void Dispose() { }
}
internal class Lock
{
private const int LOCK_ACQUISITION_TIMEOUT_MSEC = 5000;
private readonly ReaderWriterLock _lock;
public Lock() {
_lock = new ReaderWriterLock();
}
public void TryWrite(Action f) {
_lock.AcquireWriterLock(LOCK_ACQUISITION_TIMEOUT_MSEC);
try {
f();
}
finally {
_lock.ReleaseWriterLock();
}
}
public T TryRead<T>(Func<T> f) {
_lock.AcquireReaderLock(LOCK_ACQUISITION_TIMEOUT_MSEC);
try {
return f();
}
finally {
_lock.ReleaseReaderLock();
}
}
}
}
namespace eventorientation
{
public interface IMessage {}
public interface IIncoming : IMessage {}
public interface IOutgoing : IMessage {}
public abstract class Request : IIncoming {}
public abstract class Response : IOutgoing {}
public abstract class Notification : IIncoming, IOutgoing {}
public abstract class Command : Request {}
public abstract class CommandStatus : Response {}
public class Success : CommandStatus {}
public class Success<T> : Success {
public T Value { get; }
public Success(T value) { Value = value; }
}
public class Failure : CommandStatus {
public string Errormessage { get; }
public Failure(string errormessage) { Errormessage = errormessage; }
}
public abstract class Query : Request {}
public abstract class QueryResult : Response {}
public class NoResponse : Response {}
}
using System;
using System.Collections.Generic;
using System.Linq;
namespace eventorientation
{
public abstract class MessageModel {}
public sealed class EmptyMessageModel : MessageModel {}
public delegate (TMessageModel model, Version version) LoadDelegate<in TIncoming, TMessageModel>(IEventstore es, TIncoming message)
where TIncoming : IIncoming
where TMessageModel : MessageModel;
public delegate void UpdateDelegate(IEventstore es, Event[] events, Version version);
public delegate TQueryResult
HandleQueryDelegate<in TQuery, in TMessageModel, out TQueryResult>(TQuery query, TMessageModel model)
where TQuery : Query
where TMessageModel : MessageModel
where TQueryResult : QueryResult;
public delegate (CommandStatus commandStatus, Event[] events, Notification[] notifications)
HandleCommandDelegate<in TCommand, in TMessageModel>(TCommand command, TMessageModel model)
where TCommand : Command
where TMessageModel : MessageModel;
public delegate Command[] HandleNotificationDelegate<in TNotification, in TMessageModel>(TNotification notification, TMessageModel model)
where TNotification : Notification
where TMessageModel : MessageModel;
public delegate (Response response, Notification[] notifications) PipelineDelegate(IIncoming message);
public interface IQueryPipeline<in TQuery, TMessageModel, out TQueryResult>
where TQuery : Query
where TMessageModel : MessageModel
where TQueryResult : QueryResult
{
(TMessageModel model, Version version) Load(IEventstore es, TQuery query);
TQueryResult Project(TQuery query, TMessageModel model);
void Update(IEventstore es, Event[] events, Version version);
}
public interface ICommandPipeline<in TCommand, TMessageModel>
where TCommand : Command
where TMessageModel : MessageModel
{
(TMessageModel model, Version version) Load(IEventstore es, TCommand command);
(CommandStatus commandStatus, Event[] events, Notification[] notifications) Execute(TCommand command, TMessageModel model);
void Update(IEventstore es, Event[] events, Version version);
}
public class MessagePump
{
private readonly IEventstore _es;
private readonly Dictionary<Type, PipelineDelegate> _pipelines;
private event UpdateDelegate _update;
public MessagePump(IEventstore es) {
_es = es;
_pipelines = new Dictionary<Type, PipelineDelegate>();
}
public void RegisterQueryPipeline<TQuery,TMessageModel,TQueryResult>(
LoadDelegate<TQuery,TMessageModel> load,
HandleQueryDelegate<TQuery,TMessageModel,TQueryResult> process,
UpdateDelegate update
)
where TQuery : Query
where TMessageModel : MessageModel
where TQueryResult : QueryResult
{
_pipelines.Add(typeof(TQuery), msg => {
var (model, _) = load(_es, msg as TQuery);
var result = process(msg as TQuery, model);
return (result, new Notification[0]);
});
_update += update;
}
public void RegisterQueryPipeline<TQuery, TMessageModel, TQueryResult>(IQueryPipeline<TQuery, TMessageModel, TQueryResult> pipeline)
where TQuery : Query
where TMessageModel : MessageModel
where TQueryResult : QueryResult
=> RegisterQueryPipeline<TQuery,TMessageModel,TQueryResult>(pipeline.Load, pipeline.Project, pipeline.Update);
public void RegisterCommandPipeline<TCommand, TMessageModel>(
LoadDelegate<TCommand, TMessageModel> load,
HandleCommandDelegate<TCommand, TMessageModel> process,
UpdateDelegate update
)
where TCommand : Command
where TMessageModel : MessageModel
{
_pipelines.Add(typeof(TCommand), msg => {
var (model, version) = load(_es, msg as TCommand);
var (commandStatus, events, notifications) = process(msg as TCommand, model);
var (newVersion, _) = _es.Record(version, events);
if (events.Any()) _update(_es, events, newVersion);
return (commandStatus, notifications);
});
_update += update;
}
public void RegisterCommandPipeline<TCommand, TMessageModel>(ICommandPipeline<TCommand, TMessageModel> pipeline)
where TCommand : Command
where TMessageModel : MessageModel
=> RegisterCommandPipeline<TCommand, TMessageModel>(pipeline.Load, pipeline.Execute, pipeline.Update);
public void RegisterNotificationPipeline<TNotification, TMessageModel>(
LoadDelegate<TNotification, TMessageModel> load,
HandleNotificationDelegate<TNotification, TMessageModel> process
)
where TNotification : Notification
where TMessageModel : MessageModel
{
_pipelines.Add(typeof(TNotification), msg => {
var (model, version) = load(_es, msg as TNotification);
var commands = process(msg as TNotification, model);
var allNotifications = commands.SelectMany(cmd => Handle(cmd).notifications);
return (new NoResponse(), allNotifications.ToArray());
});
}
private bool _hasBeenStarted;
public void Start() {
if (_hasBeenStarted) return;
_update(_es, new Event[0], null);
_hasBeenStarted = true;
}
public (Response response, Notification[] notifications) Handle(IIncoming input) {
Start();
return _pipelines[input.GetType()](input);
}
public (TResponse response, Notification[] notifications) Handle<TResponse>(IIncoming input)
where TResponse : Response
{
var result = Handle(input);
return (result.response as TResponse, result.notifications);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment