Skip to content

Instantly share code, notes, and snippets.

@leidegre
Created February 11, 2018 18:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save leidegre/2ba149694e00491f48613c48cabb1163 to your computer and use it in GitHub Desktop.
Save leidegre/2ba149694e00491f48613c48cabb1163 to your computer and use it in GitHub Desktop.
An event sourcing example minus the cruft to illustrate basic principles of an event store
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.Linq;
namespace EventSourcingExample
{
struct Event
{
public int Id { get; }
public string StreamId { get; }
public int SequenceNumber { get; }
public JToken Payload { get; }
public DateTimeOffset Created { get; }
public Event(int id, string streamId, int sequenceNumber, JToken payload, DateTimeOffset? created = null)
{
this.Id = id;
this.StreamId = streamId;
this.SequenceNumber = sequenceNumber;
this.Payload = payload;
this.Created = created ?? DateTimeOffset.UtcNow;
}
}
class EventStore
{
private readonly List<Event> _store;
private readonly Dictionary<string, List<int>> _index;
public EventStore()
{
_store = new List<Event>();
_index = new Dictionary<string, List<int>>();
}
public void Append(IEnumerable<Event> uncommitted)
{
var store = _store; // List<Event>
var index = _index; // Dictionary<string, List<int>>
foreach (var e in uncommitted)
{
var id = store.Count + 1; // 1 based
if (index.TryGetValue(e.StreamId, out var stream))
{
if (!(e.SequenceNumber == stream.Count + 1))
throw new InvalidOperationException($"data race detected ({e.StreamId}/{e.SequenceNumber})");
stream.Add(id);
}
else
{
if (!(e.SequenceNumber == 1))
throw new InvalidOperationException($"data race detected ({e.StreamId}/{e.SequenceNumber})");
index.Add(e.StreamId, new List<int> { id });
}
store.Add(new Event(id, e.StreamId, e.SequenceNumber, e.Payload, e.Created));
}
}
public IEnumerable<Event> GetEnumerableStream(string streamId, int minSequenceNumber = 1, int maxSequenceNumber = int.MaxValue)
{
var store = _store; // List<Event>
var index = _index; // Dictionary<string, List<int>>
if (index.TryGetValue(streamId, out var stream))
{
var upperBound = Math.Min(maxSequenceNumber, stream.Count);
for (int i = minSequenceNumber - 1; i < upperBound; i++)
{
yield return store[stream[i] - 1];
}
}
}
public IEnumerable<Event> GetEnumerable(int minId = 1, int maxId = int.MaxValue)
{
var store = _store;
var upperBound = Math.Min(maxId, store.Count);
for (int i = minId - 1; i < upperBound; i++)
{
yield return store[i];
}
}
}
class Program
{
static void Main(string[] args)
{
CreateUserExample();
}
private static void CreateUserExample()
{
var es = new EventStore();
CreateUser("john@tessin.se", es);
ChangeUserName("john@tessin.se", "john+es@tessin.se", es);
Console.WriteLine(JsonConvert.SerializeObject(es.GetEnumerable(), Formatting.Indented));
ProjectUserTable(es);
// we can also compute a running total like this,
// there's no need to query the database for the
// information, we can create many online algorithms like this
// the efficently compute the next "value" as needed
var userCount = AggregateUserCount(0, es.GetEnumerable(1, 1));
Console.WriteLine(userCount);
var userCount2 = AggregateUserCount(userCount, es.GetEnumerable(2, 2));
Console.WriteLine(userCount2);
var userCount3 = AggregateUserCount(userCount2, es.GetEnumerable(3, 3));
Console.WriteLine(userCount3);
}
private static void CreateUser(string userName, EventStore es)
{
// for simplicity sake, use `userName` as `streamId`
var stream = es.GetEnumerableStream(userName);
if (stream.Any())
{
throw new InvalidOperationException("user exists");
}
var e = new Event(0, userName, 1, new JObject { { "@type", "userCreated" }, { "userName", userName } });
es.Append(new[] { e });
}
private static void ChangeUserName(string currentUserName, string newUserName, EventStore es)
{
var stream1 = es.GetEnumerableStream(currentUserName);
if (!stream1.Any())
{
throw new InvalidOperationException("user does not exist");
}
// the check below is redundant because the event store
// will not allow an existing event to be overwritten
//var stream2 = es.GetStream(currentUserName);
//if (stream2.Any())
//{
// throw new InvalidOperationException("user exists");
//}
var last = stream1.Last();
var e1 = new Event(0, currentUserName, last.SequenceNumber + 1, new JObject { { "@type", "userNameChanged" }, { "newUserName", newUserName } });
var e2 = new Event(0, newUserName, 1, new JObject { { "@type", "userCreated" }, { "userName", newUserName } });
es.Append(new[] { e2, e1 });
}
class UserEntity
{
public string UserName { get; set; }
public List<string> UserNameHistory { get; set; }
}
private static void ProjectUserTable(EventStore es)
{
var userTable = new Dictionary<string, UserEntity>();
foreach (var e in es.GetEnumerable())
{
switch ((string)e.Payload["@type"])
{
case "userCreated":
{
userTable[e.StreamId] = new UserEntity { UserName = (string)e.Payload["userName"] };
break;
}
case "userNameChanged":
{
// given the order in which these are created
// we can assume that the old user exists (no need to double check that)
// and given we create the new user before we issue the rename
// we don't have to check that the new user exists either
// we do however have to ensure that the list is not null
var currentUser = userTable[e.StreamId];
userTable.Remove(e.StreamId);
var newUser = userTable[(string)e.Payload["newUserName"]];
if (newUser.UserNameHistory == null)
{
newUser.UserNameHistory = new List<string>();
}
newUser.UserNameHistory.Add(e.StreamId);
break;
}
}
Console.WriteLine(JsonConvert.SerializeObject(userTable, Formatting.Indented));
}
}
private static int AggregateUserCount(int count, IEnumerable<Event> source)
{
foreach (var e in source)
{
switch ((string)e.Payload["@type"])
{
case "userCreated":
{
count++;
break;
}
case "userNameChanged":
{
count--;
break;
}
}
}
return count;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment