Skip to content

Instantly share code, notes, and snippets.

Last active August 29, 2015 14:03
Show Gist options
  • Save serialseb/d9a5926849ee9a0c6a6d to your computer and use it in GitHub Desktop.
Save serialseb/d9a5926849ee9a0c6a6d to your computer and use it in GitHub Desktop.
Projecting things
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Bson.Serialization.Attributes;
using MongoDB.Bson.Serialization.IdGenerators;
using MongoDB.Driver;
using MongoDB.Driver.Builders;
using NEventStore;
using NEventStore.Dispatcher;
namespace TestNewProjections
class Program
const int CONSUMER_COUNT = 10;
const int AGGREGATE_COUNT = 100;
const int EVENT_COUNT = 1000;
const string DATABASE_NAME = "projections";
static void Main(string[] args)
TestContention(new InMemoryProjectionStore(), delay: false, projectors: CONSUMER_COUNT);
//var mongo = PrepareMongo();
//TestContention(new MongoProjectionStore(mongo), delay: false, projectors: CONSUMER_COUNT);
static MongoCollection<ProjectionContainer> PrepareMongo()
var client = new MongoClient("mongodb://quantum");
var server = client.GetServer();
if (server.GetDatabaseNames().Contains(DATABASE_NAME))
var db = server.GetDatabase(DATABASE_NAME);
var mongoCollection = db.GetCollection<ProjectionContainer>("projection");
mongoCollection.CreateIndex(IndexKeys<ProjectionContainer>.Descending(_ => _.Metadata.Version));
return mongoCollection;
static void TestContention(IProjectionStore projections, bool delay, int projectors)
var dispatcher = new InMemoryDispatcher(delay, true);
var es = Wireup.Init()
var testStreamId = Guid.NewGuid().ToString();
var consumers = Enumerable.Range(0, projectors).Select(_ => new MessageConsumer(es, projections, _)).ToArray();
Console.WriteLine("Testing {0} - Starting {1} projectors", projections.GetType().Name, projectors);
foreach (var consumer in consumers) consumer.Start();
var random = new Random();
Console.WriteLine("Pushing {0} events on {1} aggregates", EVENT_COUNT, AGGREGATE_COUNT);
var sw = new Stopwatch();
var postWrites = new Stopwatch();
int commitCount = 0;
for (var i = 0; i < AGGREGATE_COUNT; i++)
var s = es.CreateStream(Guid.NewGuid());
for (var j = 0; j < EVENT_COUNT; j++)
s.Add(new EventMessage { Body = j });
if (j % (random.Next(4) + 1) == 0)
if (s.UncommittedEvents.Any())
Console.WriteLine("Wrote {0} commits, Waiting for results.", commitCount);
while (dispatcher.DispatchedCommit < commitCount);
var results = ReadResults();
Console.WriteLine("Ran projections {0} times for {1} commits in {2}, processing for {3} after finishing writing events.",
commitCount, sw.Elapsed, postWrites.Elapsed);
var all = projections.ToList();
if (all.Count != AGGREGATE_COUNT)
Console.WriteLine("Not enough projections.");
foreach (var projection in projections)
if (projection.Metadata.Version != EVENT_COUNT && projection.Projection != EVENT_COUNT)
if (projection.Metadata.Version != EVENT_COUNT)
else if (projection.Projection != EVENT_COUNT)
static string ReadResults()
var sb = new StringBuilder();
while(MessageDispatcher.Results.IsEmpty == false)
char outValue;
if (MessageDispatcher.Results.TryDequeue(out outValue))
return sb.ToString();
class InMemoryDispatcher : IDispatchCommits
readonly bool _randomDelays;
readonly bool _runAsync;
readonly Random random = new Random();
Action<ICommit> _run;
int _dispatchedCommit;
public InMemoryDispatcher(bool randomDelays, bool runAsync)
_randomDelays = randomDelays;
_run = ProcessCommit;
if (runAsync) _run = ProcessCommitAsync;
public void Dispatch(ICommit commit)
void ProcessCommitAsync(ICommit commit)
//Task.Run(() => ProcessCommit(commit));
Task.Factory.StartNew(() => ProcessCommit(commit), TaskCreationOptions.LongRunning);
void ProcessCommit(ICommit commit)
if (_randomDelays)
MessageDispatcher.Queue.Add(new EventStreamPersisted(
Interlocked.Increment(ref _dispatchedCommit);
public int DispatchedCommit
get { return _dispatchedCommit; }
public void Dispose()
public class MessageDispatcher
public static readonly BlockingCollection<EventStreamPersisted> Queue =
new BlockingCollection<EventStreamPersisted>();
public static readonly ConcurrentQueue<char> Results = new ConcurrentQueue<char>();
public class MessageConsumer
readonly CancellationTokenSource _cancelSource;
readonly Task _consume;
readonly IStoreEvents _es;
readonly int _index;
readonly IProjectionStore _projectionStore;
public bool Consuming { get; set; }
public MessageConsumer(IStoreEvents es, IProjectionStore projectionStore, int index)
_es = es;
_projectionStore = projectionStore;
_index = index;
_cancelSource = new CancellationTokenSource();
_consume = new Task(Read, _cancelSource.Token);
public void Start()
void Read()
var message = MessageDispatcher.Queue.Take();
Consuming = true;
new ProjectionRunner(_es, _projectionStore, _index).Project(message.StreamId, message.Version);
Consuming = false;
while (true);
void Stop()
class ProjectionRunner
readonly IStoreEvents _eventStore;
readonly int _index;
readonly IProjectionStore _projectionStore;
public ProjectionRunner(IStoreEvents eventStore, IProjectionStore projectionStore, int index)
_eventStore = eventStore;
_projectionStore = projectionStore;
_index = index;
public void Project(string streamId, int version)
for (var i = 0; i < 500; i++)
if (TryProject(streamId, version))
throw new InvalidOperationException("Tried 500 times and gave up");
bool TryProject(string streamId, int version)
var projection = _projectionStore.LoadLatest(streamId);
var existingVersion = -1;
if (projection == null)
projection = new ProjectionContainer { StreamId = streamId };
existingVersion = projection.Metadata.Version;
if (existingVersion >= version)
return true;
var commits = _eventStore.Advanced.GetFrom(streamId, existingVersion + 1, int.MaxValue).ToList();
var eventMessages = commits.SelectMany(_ => _.Events).ToList();
if (eventMessages.Any() == false)
return true;
var newVersion = commits.Max(_ => _.StreamRevision);
foreach (var ev in eventMessages)
// apply events to projection
projection.Metadata.Version = newVersion;
return _projectionStore.TryUpdate(projection, existingVersion);
public class EventStreamPersisted
public EventStreamPersisted(string streamId, int version)
StreamId = streamId;
Version = version;
public string StreamId { get; private set; }
public int Version { get; private set; }
public interface IProjectionStore : IEnumerable<ProjectionContainer>
ProjectionContainer LoadLatest(string streamId);
bool TryUpdate(ProjectionContainer data, int existingVersion = -1);
public class MongoProjectionStore : IProjectionStore
readonly MongoCollection<ProjectionContainer> _db;
const int ERROR_DUPLICATE_KEY = 1100;
public MongoProjectionStore(MongoCollection<ProjectionContainer> db)
_db = db;
public ProjectionContainer LoadLatest(string streamId)
return _db.FindOneById(streamId);
public bool TryUpdate(ProjectionContainer data, int existingVersion = -1)
var documentToUpdate = Query.And(Query<ProjectionContainer>.EQ(_ => _.StreamId, data.StreamId),
Query<ProjectionContainer>.EQ(_ => _.Metadata.Version, existingVersion));
var result = _db.FindAndModify(new FindAndModifyArgs
VersionReturned = FindAndModifyDocumentVersion.Modified,
Query = documentToUpdate,
Update = Update<ProjectionContainer>.Set(_ => _.Metadata, data.Metadata)
.Set(_ => _.Projection, data.Projection),
Upsert = true
if (result.Ok) return true;
catch (MongoCommandException e)
if (e.CommandResult.Code != ERROR_DUPLICATE_KEY)
return false;
public IEnumerator<ProjectionContainer> GetEnumerator()
return _db.FindAll().GetEnumerator();
IEnumerator IEnumerable.GetEnumerator()
return GetEnumerator();
public class InMemoryProjectionStore : IEnumerable<ProjectionContainer>, IProjectionStore
readonly ConcurrentDictionary<Tuple<string, int>, ProjectionContainer> _projections =
new ConcurrentDictionary<Tuple<string, int>, ProjectionContainer>();
public ProjectionContainer LoadLatest(string streamId)
var value = _projections.Values.Where(_ => _.StreamId == streamId)
.OrderByDescending(_ => _.Metadata.Version)
return value == null
? null
: Copy(value);
public bool TryUpdate(ProjectionContainer data, int existingVersion = -1)
var streamId = data.StreamId;
var newKey = Tuple.Create(streamId, data.Metadata.Version);
if (existingVersion != -1)
var existingKey = Tuple.Create(streamId, existingVersion);
if (_projections.ContainsKey(existingKey) == false)
return false;
ProjectionContainer oldData;
if (!_projections.TryRemove(existingKey, out oldData))
return false;
return _projections.TryAdd(newKey, Copy(data));
IEnumerator IEnumerable.GetEnumerator()
return GetEnumerator();
public IEnumerator<ProjectionContainer> GetEnumerator()
return _projections.Values.GetEnumerator();
static ProjectionContainer Copy(ProjectionContainer value)
return new ProjectionContainer
StreamId = value.StreamId,
Projection = value.Projection,
Metadata =
new ProjectionMetadata
Version = value.Metadata.Version,
public class ProjectionContainer
public ProjectionContainer()
Metadata = new ProjectionMetadata();
public string StreamId { get; set; }
public ProjectionMetadata Metadata { get; set; }
public int Projection { get; set; }
public class ProjectionMetadata
public int Version { get; set; }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment