Skip to content

Instantly share code, notes, and snippets.

@andreabalducci
Last active August 29, 2015 14:08
Show Gist options
  • Save andreabalducci/2b2ad587f47afdbec185 to your computer and use it in GitHub Desktop.
Save andreabalducci/2b2ad587f47afdbec185 to your computer and use it in GitHub Desktop.
Dealing with eventual consistency with read model "promises" with NEvenstore & MongoDb (WIP)
using Jarvis.DocumentStore.Core.CommandHandlers.HandleHandlers;
using Jarvis.DocumentStore.Core.Domain.Document.Commands;
using Jarvis.DocumentStore.Core.Domain.Handle;
using Jarvis.DocumentStore.Core.ReadModel;
namespace Jarvis.DocumentStore.Core.CommandHandlers.DocumentHandlers
{
public class CreateDocumentCommandHandler : DocumentCommandHandler<CreateDocument>
{
readonly IHandleMapper _mapper;
public CreateDocumentCommandHandler(IHandleMapper mapper, IHandleWriter writer)
{
_mapper = mapper;
}
protected override void Execute(CreateDocument cmd)
{
FindAndModify(
cmd.AggregateId,
doc => doc.Create(cmd.AggregateId,cmd.BlobId,cmd.HandleInfo,cmd.Hash),
true
);
LinkHandle(cmd);
}
void LinkHandle(CreateDocument cmd)
{
var docHandle = cmd.HandleInfo.Handle;
var id = _mapper.Map(docHandle);
var handle = Repository.GetById<Handle>(id);
if (!handle.HasBeenCreated)
{
handle.Initialize(id, docHandle);
}
handle.Link(cmd.AggregateId);
handle.SetCustomData(cmd.HandleInfo.CustomData);
Repository.Save(handle, cmd.MessageId, h => { });
}
}
}
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using CQRS.Kernel.Events;
using Jarvis.DocumentStore.Core.Domain.Handle.Events;
using Jarvis.DocumentStore.Core.ReadModel;
using NEventStore;
namespace Jarvis.DocumentStore.Core.EventHandlers
{
public class HandleProjection : AbstractProjection
,IEventHandler<HandleInitialized>
,IEventHandler<HandleLinked>
,IEventHandler<HandleCustomDataSet>
,IEventHandler<HandleDeleted>
{
readonly IHandleWriter _writer;
public HandleProjection(IHandleWriter writer)
{
_writer = writer;
}
public override void Drop()
{
_writer.Drop();
}
public override void SetUp()
{
_writer.Init();
}
public void On(HandleLinked e)
{
_writer.ConfirmLink(
e.Handle,
e.DocumentId,
LongCheckpoint.Parse(e.CheckpointToken).LongValue
);
}
public void On(HandleCustomDataSet e)
{
_writer.UpdateCustomData(e.Handle, e.CustomData);
}
public void On(HandleInitialized e)
{
}
public void On(HandleDeleted e)
{
_writer.Delete(e.Handle, LongCheckpoint.Parse(e.CheckpointToken).LongValue);
}
}
}
using System.Linq;
using CQRS.Shared.ReadModel;
using Jarvis.DocumentStore.Core.Domain.Document;
using Jarvis.DocumentStore.Core.Domain.Handle;
using Jarvis.DocumentStore.Core.Model;
using MongoDB.Bson;
using MongoDB.Bson.Serialization.Attributes;
using MongoDB.Driver;
using MongoDB.Driver.Builders;
using MongoDB.Driver.Linq;
namespace Jarvis.DocumentStore.Core.ReadModel
{
public class HandleReadModel : IReadModel
{
[BsonId]
public DocumentHandle Handle { get; private set; }
public DocumentId DocumentId { get; private set; }
public long CreatetAt { get; private set; }
public long ProjectedAt { get; private set; }
public HandleCustomData CustomData { get; private set; }
public FileNameWithExtension FileName { get; private set; }
public HandleReadModel(DocumentHandle handle)
{
Handle = handle;
}
public HandleReadModel(DocumentHandle handle, DocumentId documentid, FileNameWithExtension fileName)
{
Handle = handle;
DocumentId = documentid;
FileName = fileName;
}
public bool IsPending()
{
return this.CreatetAt > this.ProjectedAt;
}
}
public interface IHandleWriter
{
void Promise(DocumentHandle handle, FileNameWithExtension fileName, DocumentId id, long createdAt);
HandleReadModel FindOneById(DocumentHandle handle);
void Drop();
void Init();
void ConfirmLink(DocumentHandle handle, DocumentId id, long projectedAt);
void UpdateCustomData(DocumentHandle handle, HandleCustomData customData);
void Delete(DocumentHandle handle, long projectedAt);
IQueryable<HandleReadModel> AllSortedByHandle { get;}
}
public class HandleWriter : IHandleWriter
{
readonly MongoCollection<HandleReadModel> _collection;
public HandleWriter(MongoDatabase readModelDb)
{
_collection = readModelDb.GetCollection<HandleReadModel>(CollectionNames.GetCollectionName<HandleReadModel>());
}
public void Promise(DocumentHandle handle, FileNameWithExtension fileName, DocumentId id, long createdAt)
{
var args = new FindAndModifyArgs
{
Query = Query<HandleReadModel>
.EQ(x => x.Handle, handle),
Update = Update<HandleReadModel>
.Set(x=>x.DocumentId, id)
.Set(x=>x.CreatetAt, createdAt)
.Set(x=>x.FileName, fileName),
Upsert = true
};
_collection.FindAndModify(args);
}
public void ConfirmLink(DocumentHandle handle,DocumentId id, long projectedAt)
{
var args = new FindAndModifyArgs
{
Query = Query.And(
Query<HandleReadModel>.EQ(x => x.Handle, handle),
Query<HandleReadModel>.LTE(x => x.CreatetAt, projectedAt)
),
Update = Update<HandleReadModel>
.Set(x => x.DocumentId, id)
.Set(x => x.ProjectedAt, projectedAt)
};
_collection.FindAndModify(args);
}
public void UpdateCustomData(DocumentHandle handle, HandleCustomData customData)
{
var args = new FindAndModifyArgs
{
Query = Query.And(
Query<HandleReadModel>.EQ(x => x.Handle, handle)
),
Update = Update<HandleReadModel>
.Set(x => x.CustomData, customData)
};
_collection.FindAndModify(args);
}
public void Delete(DocumentHandle handle, long projectedAt)
{
/* TODO: handle Delete promise */
var args = new FindAndRemoveArgs()
{
Query = Query.And(
Query<HandleReadModel>.EQ(x => x.Handle, handle),
Query<HandleReadModel>.LTE(x => x.CreatetAt, projectedAt)
)
};
_collection.FindAndRemove(args);
}
public IQueryable<HandleReadModel> AllSortedByHandle {
get { return _collection.AsQueryable().OrderBy(x => x.Handle); }
}
public HandleReadModel FindOneById(DocumentHandle handle)
{
return _collection.FindOneById(BsonValue.Create(handle));
}
public void Drop()
{
_collection.Drop();
}
public void Init()
{
_collection.CreateIndex(IndexKeys<HandleReadModel>.Ascending(x => x.Handle, x => x.CreatetAt));
}
public void Create(DocumentHandle documentHandle)
{
_collection.Insert(new HandleReadModel(documentHandle));
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Jarvis.DocumentStore.Core.Domain.Document;
using Jarvis.DocumentStore.Core.Domain.Document.Events;
using Jarvis.DocumentStore.Core.ReadModel;
using NEventStore;
namespace Jarvis.DocumentStore.Core.EvenstoreHooks
{
public class ReadModelPromisesHook : PipelineHookBase
{
private readonly IHandleWriter _handleWriter;
private static readonly string DocumentTypeName = typeof (Document).FullName;
public ReadModelPromisesHook(IHandleWriter handleWriter)
{
_handleWriter = handleWriter;
}
public override void PostCommit(ICommit committed)
{
if (!committed.Headers.ContainsKey("AggregateType"))
return;
var type = (string)committed.Headers["AggregateType"];
if (type != DocumentTypeName)
return;
var docCreated = committed.Events
.Where(x => x.Body is DocumentCreated)
.Select(x => (DocumentCreated)x.Body)
.FirstOrDefault();
if (docCreated != null)
{
_handleWriter.Promise(
docCreated.HandleInfo.Handle,
docCreated.HandleInfo.FileName,
(DocumentId)docCreated.AggregateId,
LongCheckpoint.Parse(committed.CheckpointToken).LongValue
);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment