Skip to content

Instantly share code, notes, and snippets.

@BennieCopeland
Last active January 15, 2018 04:48
Show Gist options
  • Save BennieCopeland/7a77b4e4d4a264fac3836832df2bb090 to your computer and use it in GitHub Desktop.
Save BennieCopeland/7a77b4e4d4a264fac3836832df2bb090 to your computer and use it in GitHub Desktop.
Basic domain objects
using System;
/// <summary>
/// The exception is thrown when the requested aggregate is not found by the <see cref="IEventStore"/> implementation.
/// </summary>
public class AggregateNotFoundException : Exception
{
/// <summary>
/// Initializes a new instance of the <see cref="AggregateNotFoundException"/> class.
/// </summary>
/// <param name="aggregateType">The type of the aggregate not found.</param>
/// <param name="aggregateId">The identity of the aggregate not found.</param>
public AggregateNotFoundException(string aggregateType, string aggregateId)
: base($"Aggregate type [{aggregateType}] with id [{aggregateId}] not found.")
{
this.AggregateType = aggregateType;
this.AggregateId = aggregateId;
}
/// <summary>
/// Initializes a new instance of the <see cref="AggregateNotFoundException"/> class.
/// </summary>
/// <param name="aggregateType">The type of the aggregate not found.</param>
/// <param name="aggregateId">The identity of the aggregate not found.</param>
/// <param name="message">The error message that explains the reason for this exception.</param>
public AggregateNotFoundException(string aggregateType, string aggregateId, string message)
: base(message)
{
this.AggregateType = aggregateType;
this.AggregateId = aggregateId;
}
/// <summary>
/// Initializes a new instance of the <see cref="AggregateNotFoundException"/> class.
/// </summary>
/// <param name="aggregateType">The type of the aggregate not found.</param>
/// <param name="aggregateId">The identity of the aggregate not found.</param>
/// <param name="innerException">The exception that is the cause of the current exception, or a null reference if no inner exception is specified.</param>
public AggregateNotFoundException(string aggregateType, string aggregateId, Exception innerException)
: base($"Aggregate type [{aggregateType}] with id [{aggregateId}] not found.", innerException)
{
this.AggregateType = aggregateType;
this.AggregateId = aggregateId;
}
/// <summary>
/// Initializes a new instance of the <see cref="AggregateNotFoundException"/> class.
/// </summary>
/// <param name="aggregateType">The type of the aggregate not found.</param>
/// <param name="aggregateId">The identity of the aggregate not found.</param>
/// <param name="message">The error message that explains the reason for this exception.</param>
/// <param name="innerException">The exception that is the cause of the current exception, or a null reference if no inner exception is specified.</param>
public AggregateNotFoundException(string aggregateType, string aggregateId, string message, Exception innerException)
: base(message, innerException)
{
this.AggregateType = aggregateType;
this.AggregateId = aggregateId;
}
/// <summary>
/// Gets the type of aggregate not found.
/// </summary>
public string AggregateType { get; }
/// <summary>
/// Gets the identity of the aggregate not found.
/// </summary>
public string AggregateId { get; }
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
/// <summary>
/// Implements the <see cref="IAggregateRoot{T}"/> interface and provides the base functionality for aggregate types.
/// </summary>
/// <typeparam name="T">The type of <see cref="IdentityBase{T}"/> used by this aggregate as its Id.</typeparam>
public abstract class AggregateRootBase<T> : IAggregateRoot<T>
where T : IdentityBase<T>
{
/// <summary>
/// Private list that holds the uncommitted changes.
/// </summary>
private readonly List<IEvent> changes = new List<IEvent>();
/// <summary>
/// Backing field for <see cref="ApplyMethods"/>.
/// </summary>
private Dictionary<Type, MethodInfo> applyMethods;
/// <summary>
/// Provides values for commit status
/// </summary>
private enum CommitStatus
{
Invalid = 0,
NotCommitted = 1,
AlreadyCommitted = 2
}
/// <summary>
/// Gets the Id of the aggregate.
/// </summary>
public abstract T Id { get; }
/// <summary>
/// Gets an enumerable list of the aggregates uncommitted changes.
/// </summary>
public IEnumerable<IEvent> UncommittedChanges
{
get
{
return this.changes;
}
}
/// <summary>
/// Gets the current version of the aggregate.
/// </summary>
public int Version { get; private set; } = -1;
/// <summary>
/// Gets a list of lazy loaded Apply methods for the aggregate.
/// </summary>
/// <remarks>
/// This complicated reflection is because dotnet core reflection does not implement InvokeMember.
/// Put into a lazily loaded property so that the reflection runtime performance hit only happens once, and only on the first ApplyChange.
/// </remarks>
private Dictionary<Type, MethodInfo> ApplyMethods
{
get
{
if (this.applyMethods == null)
{
const BindingFlags flags = BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.InvokeMethod;
this.applyMethods = this.GetType()
.GetTypeInfo()
.GetMethods(flags)
.Where(m => m.Name == "Apply")
.Where(m =>
{
var @params = m.GetParameters();
return @params.Count() == 1 && @params[0].ParameterType.GetTypeInfo().GetInterfaces().Contains(typeof(IEvent));
})
.ToDictionary(m => m.GetParameters()[0].ParameterType, m => m);
}
return this.applyMethods;
}
}
/// <summary>
/// Applies the list of committed events.
/// </summary>
/// <param name="committedEvents">The list of committed events.</param>
/// <remarks>
/// Concrete implementations should use this method to rehydrate the aggregate from a list of events stored in the event store.
/// </remarks>
/// <exception cref="ArgumentNullException">committedEvents is null.</exception>
/// <exception cref="NullEventException">committedEvents contains a null item.</exception>
public void LoadFromHistory(IEnumerable<IEvent> committedEvents)
{
if (committedEvents == null)
{
throw new ArgumentNullException(nameof(committedEvents));
}
foreach (var @event in committedEvents)
{
this.ApplyChange(@event, CommitStatus.AlreadyCommitted);
}
}
/// <summary>
/// Marks the uncommitted changes as committed.
/// </summary>
/// <remarks>
/// The event store should use this method to mark the changes as committed after they have been persisted to the event stream.
/// </remarks>
public void MarkChangesAsCommitted()
{
this.changes.Clear();
}
/// <summary>
/// Calls the Apply method on the aggregate to apply the change and adds the event to the <see cref="UncommittedChanges"/> list.
/// </summary>
/// <param name="evnt">The event to apply.</param>
/// <remarks>
/// Concrete implementations should use this method to apply a new change to the aggregate.
/// </remarks>
/// <example>
/// <code>
/// public class SomeAggregate : AggregateRoot&lt;SomeAggregateId&gt;
/// {
/// public void DoSomething(string someValue)
/// {
/// // Check someValue to satisfy invariants
/// if(string.IsNullOrEmpty(someValue))
/// {
/// throw new Exception();
/// }
///
/// // Create event recording the change
/// SomethingHappenedEvent e = new SomethingHappenedEvent(someValue);
///
/// // Use the base class's ApplyChange method to apply the change and record it as uncommitted.
/// this.ApplyChange(e);
/// }
///
/// // Called by the base class's ApplyChange method
/// private void Apply(SomethingHappenedEvent e)
/// {
/// this.SomeValue = e.SomeValue;
/// }
/// }
/// </code>
/// </example>
protected void ApplyChange(IEvent evnt)
{
this.ApplyChange(evnt, CommitStatus.NotCommitted);
}
/// <summary>
/// Calls the Apply method on the aggregate to apply the change.
/// </summary>
/// <param name="evnt">The event to apply</param>
/// <param name="commitStatus">Signifies if the event is a committed change or a new change.</param>
private void ApplyChange(IEvent evnt, CommitStatus commitStatus)
{
if (evnt == null)
{
throw new NullEventException();
}
MethodInfo method;
if (this.ApplyMethods.TryGetValue(evnt.GetType(), out method))
{
method.Invoke(this, new object[] { evnt });
++this.Version;
if (commitStatus == CommitStatus.NotCommitted)
{
this.changes.Add(evnt);
}
}
else
{
throw new UnrecognizedEventException(this.GetType().Name, nameof(evnt));
}
}
}
using System.Collections.Generic;
/// <summary>
/// Defines the event handling functionality that an aggregate root must implement.
/// </summary>
/// <typeparam name="T">The type of id that identifies this aggregate.</typeparam>
public interface IAggregateRoot<out T>
where T : IIdentity
{
/// <summary>
/// Gets the current version of the aggregate.
/// </summary>
int Version { get; }
/// <summary>
/// Gets the Id of the aggregate.
/// </summary>
T Id { get; }
/// <summary>
/// Gets an enumerable list of the aggregates uncommitted changes.
/// </summary>
IEnumerable<IEvent> UncommittedChanges { get; }
/// <summary>
/// Marks the uncommitted changes as committed.
/// </summary>
/// <remarks>
/// The event store should use this method to mark the changes as committed after they have been persisted to the event stream.
/// </remarks>
void MarkChangesAsCommitted();
/// <summary>
/// Applies the list of committed events.
/// </summary>
/// <param name="committedEvents">The list of committed events.</param>
/// <remarks>
/// Concrete implementations should use this method to rehydrate the aggregate from a list of events stored in the event store.
/// </remarks>
void LoadFromHistory(IEnumerable<IEvent> committedEvents);
}
using System;
/// <summary>
/// Implements the <see cref="IIdentity"/> and <see cref="IEquatable{T}"/> interfaces and provides base functionality for aggregate identities as a value type.
/// </summary>
/// <typeparam name="T">The class that will be an aggregate identity.</typeparam>
public abstract class IdentityBase<T> : IIdentity, IEquatable<T>
where T : IdentityBase<T>
{
private readonly string value;
/// <summary>
/// Initializes a new instance of the <see cref="IdentityBase{T}"/> class.
/// </summary>
/// <param name="anIdentity">The identity value.</param>
protected IdentityBase(string anIdentity)
{
if (string.IsNullOrEmpty(anIdentity))
{
throw new System.ArgumentNullException(nameof(anIdentity));
}
this.value = anIdentity;
}
/// <summary>
/// Determines whether the left side identity is value equal to the right side identity.
/// </summary>
/// <param name="left">The identity on the left to compare with the identity on the right.</param>
/// <param name="right">The identity on the right to compare with the identity on the left.</param>
/// <returns><code>true</code> if the left identity is value equal to the right identity; otherwise, <code>false</code></returns>
public static bool operator ==(IdentityBase<T> left, IdentityBase<T> right)
{
if (object.ReferenceEquals(left, null) && object.ReferenceEquals(right, null))
{
return true;
}
return left?.Equals(right) == true;
}
/// <summary>
/// Determines whether the left side identity is value equal to the right side object.
/// </summary>
/// <param name="left">The identity on the left to compare with the object on the right.</param>
/// <param name="right">The object on the right to compare with the identity on the left.</param>
/// <returns><code>true</code> if the left identity is value equal to the right object; otherwise, <code>false</code></returns>
public static bool operator ==(IdentityBase<T> left, object right)
{
if (object.ReferenceEquals(left, null) && object.ReferenceEquals(right, null))
{
return true;
}
return left?.Equals(right) == true;
}
/// <summary>
/// Determines whether the left side object is value equal to the right side identity.
/// </summary>
/// <param name="left">The object on the left to compare with the identity on the right.</param>
/// <param name="right">The identity on the right to compare with the object on the left.</param>
/// <returns><code>true</code> if the left object is value equal to the right identity; otherwise, <code>false</code></returns>
public static bool operator ==(object left, IdentityBase<T> right)
{
if (object.ReferenceEquals(left, null) && object.ReferenceEquals(right, null))
{
return true;
}
return left?.Equals(right) == true;
}
/// <summary>
/// Determines whether the left side identity is not value equal to the right side identity.
/// </summary>
/// <param name="left">The identity on the left to compare with the identity on the right.</param>
/// <param name="right">The identity on the right to compare with the identity on the left.</param>
/// <returns><code>true</code> if the left identity is not value equal to the right identity; otherwise, <code>false</code></returns>
public static bool operator !=(IdentityBase<T> left, IdentityBase<T> right)
{
if (object.ReferenceEquals(left, null) && object.ReferenceEquals(right, null))
{
return false;
}
return !(left?.Equals(right) == true);
}
/// <summary>
/// Determines whether the left side identity is not value equal to the right side object.
/// </summary>
/// <param name="left">The identity on the left to compare with the object on the right.</param>
/// <param name="right">The object on the right to compare with the identity on the left.</param>
/// <returns><code>true</code> if the left identity is not value equal to the right object; otherwise, <code>false</code></returns>
public static bool operator !=(IdentityBase<T> left, object right)
{
if (object.ReferenceEquals(left, null) && object.ReferenceEquals(right, null))
{
return false;
}
return !(left?.Equals(right) == true);
}
/// <summary>
/// Determines whether the left side object is not value equal to the right side identity.
/// </summary>
/// <param name="left">The object on the left to compare with the identity on the right.</param>
/// <param name="right">The identity on the right to compare with the object on the left.</param>
/// <returns><code>true</code> if the left object is not value equal to the right identity; otherwise, <code>false</code></returns>
public static bool operator !=(object left, IdentityBase<T> right)
{
if (object.ReferenceEquals(left, null) && object.ReferenceEquals(right, null))
{
return false;
}
return !(left?.Equals(right) == true);
}
/// <summary>
/// Determines whether the specified object is value equal to the current object.
/// </summary>
/// <param name="obj">The object to compare with the current object.</param>
/// <returns><code>true</code> if the specified object is value equal to the current object; otherwise, <code>false</code></returns>
public override bool Equals(object obj)
{
return this.Equals(obj as T);
}
/// <summary>
/// Determines whether the specified identity is value equal to the current identity.
/// </summary>
/// <param name="right">The identity to compare with the current identity.</param>
/// <returns><code>true</code> if the specified identity is value equal to the current identity; otherwise, <code>false</code></returns>
public bool Equals(T right)
{
if (object.ReferenceEquals(right, null))
{
return false;
}
if (object.ReferenceEquals(this, right))
{
return true;
}
if (this.GetType() != right.GetType())
{
return false;
}
return this.value == right.value;
}
/// <summary>
/// Used by hash map implementations to place in the proper bucket.
/// </summary>
/// <returns>A hashcode for the current identity.</returns>
public override int GetHashCode()
{
return this.value.GetHashCode();
}
/// <summary>
/// Returns a string that represents the value of the identity.
/// </summary>
/// <returns>A string that represents the value of the identity.</returns>
public override string ToString()
{
return this.value;
}
/// <summary>
/// Returns a string that represents the value of the identity.
/// <para>
/// Supported format: t for "{typename}:{id}"
/// </para>
/// </summary>
/// <param name="format">Specifies the format of the returned string.</param>
/// <returns>A string that represents the type and value of the identity.</returns>
public string ToString(string format)
{
switch (format)
{
case "t":
return $"{this.GetType().Name}:{this.value}";
default:
return this.value;
}
}
}
/// <summary>
/// A marker interface that represents an event
/// </summary>
public interface IEvent
{
}
using System.Threading;
using System.Threading.Tasks;
/// <summary>
/// Defines the functionality that an EventStore needs to implement.
/// </summary>
public interface IEventStore
{
/// <summary>
/// Saves the uncommitted changes to the aggregate's event stream asynchronously.
/// </summary>
/// <typeparam name="T">The type of aggregate to save.</typeparam>
/// <param name="model">The aggregate to save.</param>
/// <param name="cancellationToken">The cancellation instruction.</param>
/// <returns>A task representing the asynchronous operations.</returns>
Task SaveAsync<T>(T model, CancellationToken cancellationToken = default(CancellationToken))
where T : class, IAggregateRoot<IIdentity>;
/// <summary>
/// Returns an aggregate restored from its event stream asynchronously.
/// </summary>
/// <typeparam name="T">The type of aggregate to restored.</typeparam>
/// <param name="id">The id of the aggregate to reconstruct.</param>
/// <param name="cancellationToken">The cancellation instruction.</param>
/// <returns>A restored aggregate.</returns>
Task<T> GetAsync<T>(string id, CancellationToken cancellationToken = default(CancellationToken))
where T : IAggregateRoot<IIdentity>;
/// <summary>
/// Returns an aggregate restored from its event stream asynchronously.
/// </summary>
/// <typeparam name="T">The type of aggregate to restored.</typeparam>
/// <param name="id">The id of the aggregate to reconstruct.</param>
/// <param name="version">The version of the aggregate to restore.</param>
/// <param name="cancellationToken">The cancellation instruction.</param>
/// <returns>A restored aggregate.</returns>
Task<T> GetAsync<T>(string id, int version, CancellationToken cancellationToken = default(CancellationToken))
where T : IAggregateRoot<IIdentity>;
/// <summary>
/// Returns the next identity for the requested type.
/// </summary>
/// <typeparam name="T">The type of identity to create.</typeparam>
/// <returns>An identity.</returns>
T NextIdentity<T>()
where T : IIdentity;
}
/// <summary>
/// Marker interface for represents an identity.
/// </summary>
public interface IIdentity
{
}
public class MsSqlStreamStoreEventStore : IEventStore
{
private readonly IStreamStore store;
private readonly IMetadataService metadataService;
public MsSqlStreamStoreEventStore(IStreamStore store, IMetadataService metadataService)
{
this.store = store;
this.metadataService = metadataService;
}
/// <summary>
/// Returns an aggregate restored from its event stream asynchronously.
/// </summary>
/// <typeparam name="T">The type of aggregate to restored.</typeparam>
/// <param name="id">The id of the aggregate to reconstruct.</param>
/// <param name="cancellationToken">The cancellation instruction.</param>
/// <returns>A restored aggregate.</returns>
public async Task<T> GetAsync<T>(string id, CancellationToken cancellationToken = default(CancellationToken)) where T : IAggregateRoot<IIdentity>
{
return await GetAsync<T>(id, int.MaxValue-1, cancellationToken);
}
/// <summary>
/// Returns an aggregate restored from its event stream asynchronously.
/// </summary>
/// <typeparam name="T">The type of aggregate to restored.</typeparam>
/// <param name="id">The id of the aggregate to reconstruct.</param>
/// <param name="version">The version of the aggregate to restore.</param>
/// <param name="cancellationToken">The cancellation instruction.</param>
/// <returns>A restored aggregate.</returns>
public async Task<T> GetAsync<T>(string id, int version, CancellationToken cancellationToken = default(CancellationToken)) where T : IAggregateRoot<IIdentity>
{
var aggregate = (T)Activator.CreateInstance(typeof(T), nonPublic: true);
var stream = await store.ReadStreamForwards(id, StreamVersion.Start, version+1);
if (stream.Status == PageReadStatus.StreamNotFound)
{
throw new AggregateNotFoundException(typeof(T).Name, id);
}
var events = stream.Messages
.Select(msg => JsonConvert.DeserializeObject(msg.GetJsonData(cancellationToken).Result, new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All }));
aggregate.LoadFromHistory(events);
return aggregate;
}
/// <summary>
/// Returns the next identity for the requested type.
/// </summary>
/// <typeparam name="T">The type of identity to create.</typeparam>
/// <returns>An identity.</returns>
public T NextIdentity<T>() where T : IIdentity
{
return (T)Activator.CreateInstance(typeof(T), Guid.NewGuid().ToString());
}
/// <summary>
/// Saves the uncommitted changes to the aggregate's event stream asynchronously.
/// </summary>
/// <typeparam name="T">The type of aggregate to save.</typeparam>
/// <param name="model">The aggregate to save.</param>
/// <param name="cancellationToken">The cancellation instruction.</param>
/// <returns>A task representing the asynchronous operations.</returns>
public async Task SaveAsync<T>(T model, CancellationToken cancellationToken)
where T : class, IAggregateRoot<IIdentity>
{
if (model == null)
{
throw new ArgumentNullException(nameof(model));
}
// TODO remove this check by rewriting AggregateBase to include a constructor that requires an Id.
if (model.Id == null)
{
throw new NullReferenceException("The Id property of model cannot be null.");
}
var events = model.UncommittedChanges
.Select(e => new NewStreamMessage(
Guid.NewGuid(),
e.GetType().Name,
JsonConvert.SerializeObject(e, new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All }),
JsonConvert.SerializeObject(metadataService.GetMetadata(), new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All })
)).ToArray();
var expectedVersion = model.Version - events.Count();
await store.AppendToStream(model.Id.ToString(), expectedVersion, events, cancellationToken);
model.MarkChangesAsCommitted();
}
}
using System;
/// <summary>
/// The exception is thrown when a null event is processed by the <see cref="AggregateRootBase{T}"/>.
/// </summary>
public class NullEventException : Exception
{
/// <summary>
/// Initializes a new instance of the <see cref="NullEventException"/> class.
/// </summary>
public NullEventException()
: base("Event cannot be null.")
{
}
}
using System;
/// <summary>
/// The exception is thrown when a type inheriting from <see cref="AggregateRootBase{T}"/> does not implement an Apply method for the event being processed.
/// </summary>
public class UnrecognizedEventException : Exception
{
/// <summary>
/// Initializes a new instance of the <see cref="UnrecognizedEventException"/> class.
/// </summary>
/// <param name="aggregateType">The type of aggregate that this exception was thrown from.</param>
/// <param name="eventType">The event type that has no corrosponding Apply method.</param>
public UnrecognizedEventException(string aggregateType, string eventType)
: base($"Aggregate type [{aggregateType}] does not implement an Apply method for event type [{eventType}]")
{
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment