Skip to content

Instantly share code, notes, and snippets.

@aleksandrov
Created July 8, 2014 17:54
Show Gist options
  • Save aleksandrov/5afbe91054a7b4c0bce0 to your computer and use it in GitHub Desktop.
Save aleksandrov/5afbe91054a7b4c0bce0 to your computer and use it in GitHub Desktop.
For NEventStore PR #306
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="FluentAssertions" version="3.0.107" targetFramework="net45" />
<package id="Machine.Specifications" version="0.8.3" targetFramework="net45" />
<package id="mongocsharpdriver" version="1.9.2" targetFramework="net45" />
<package id="NEventStore" version="5.0.1.2" targetFramework="net45" />
<package id="NEventStore.Persistence.MongoDB" version="5.0.0.101" targetFramework="net45" />
</packages>
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using CommonDomain;
using CommonDomain.Core;
using CommonDomain.Persistence;
using CommonDomain.Persistence.EventStore;
using FluentAssertions;
using Machine.Specifications;
using NEventStore;
using NEventStore.Dispatcher;
using NEventStore.Serialization;
namespace NeventStore.Reproduce
{
public class Spec
{
#region aggregate
public class TestAggregate : AggregateBase
{
private string value;
public TestAggregate(Guid id)
{
RaiseEvent(new Created() { Id = id });
}
public TestAggregate()
{
Register<Created>(Apply);
Register<Added>(Apply);
}
private void Apply(Added message)
{
value += message.Value;
}
private void Apply(Created message)
{
Id = message.Id;
}
public void Sum(int v)
{
RaiseEvent(new Added() { Id = Id, Value = v });
}
}
public class Created
{
public Guid Id { get; set; }
}
public class Added
{
public Guid Id { get; set; }
public int Value { get; set; }
}
#endregion
#region aggregate factory
class Factory : IConstructAggregates
{
public IAggregate Build(Type type, Guid id, IMemento snapshot)
{
return (IAggregate) Activator.CreateInstance(type);
}
}
class Dispatcher : IDispatchCommits
{
public void Dispose()
{
}
public void Dispatch(ICommit commit)
{
}
}
class TestConflictDetector : IDetectConflicts
{
public bool WasUsed;
public void Register<TUncommitted, TCommitted>(ConflictDelegate handler) where TUncommitted : class where TCommitted : class
{
}
public bool ConflictsWith(IEnumerable<object> uncommittedEvents, IEnumerable<object> committedEvents)
{
WasUsed = true;
return true;
}
}
#endregion
Establish context = () =>
{
id = Guid.NewGuid();
store = Wireup.Init()
.UsingMongoPersistence(() => "mongodb://127.0.0.1/reproduce?w=1", new DocumentObjectSerializer())
.UsingAsynchronousDispatchScheduler(new Dispatcher())
.LogToConsoleWindow()
.Build();
conflictDetector = new TestConflictDetector();
repository = new EventStoreRepository(store, new Factory(), new ConflictDetector());
CreateAggregate();
task1 = Task.Factory.StartNew(() => Sum(111));
task2 = Task.Factory.StartNew(() => Sum(222));
};
Because of = () => action = () => Task.WaitAll(task1, task2);
private It should_throw_an_error = () => action.ShouldThrow<ConcurrencyException>();
private It should_call_Conflict_Detector = () => conflictDetector.WasUsed.Should().BeTrue();
private static void CreateAggregate()
{
var aggregate = new TestAggregate(id);
repository.Save(aggregate, Guid.NewGuid(), headers => { });
}
private static void Sum(int value)
{
var aggregate = repository.GetById<TestAggregate>(id, 0);
aggregate.Sum(value);
repository.Save(aggregate, Guid.NewGuid(), headers => { });
}
private static IStoreEvents store;
private static EventStoreRepository repository;
private static Guid id;
private static Task task1;
private static Task task2;
private static Action action;
private static TestConflictDetector conflictDetector;
}
}
@aleksandrov
Copy link
Author

It is bit noisy, but anyway.

This test produce the following in the log

2014/07/08 17:44:36.55 - 9 - NEventStore.OptimisticEventStream - The underlying stream '389ad66d-a93c-44e9-b8a5-fcc5389bf60c' has changed since the last known commit, refreshing the stream.

It shows that persistence engine detected changed stream and reloaded it, after that original CommonDomain calls IDetectConflicts implementation. ConflictDetector takes uncommited changes from one side and commited changes from other side to make the decision, and refuse second commit attempt.

But now stream.ClearChanges() called BEFORE ConflictDetector call and ConflictDetector unable to detect conflict (because there are no uncommited change), that leads that Aggregate saves the same changes again.

If my description is unclear, ask me, I'll try to explain more detailed.

Hope that helps!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment