Skip to content

Instantly share code, notes, and snippets.

@haf
Created January 17, 2012 16:03
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save haf/1627224 to your computer and use it in GitHub Desktop.
Save haf/1627224 to your computer and use it in GitHub Desktop.
Spike MassTransit EventStore Saga Repo
// Copyright 2007-2011 Henrik Feldt
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
namespace MassTransit.EventStoreIntegration
{
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using EventStore;
using EventStore.Persistence;
using Exceptions;
using Pipeline;
using Saga;
using Util;
using log4net;
public interface IEventSourcedSaga : ISaga
{
/// <summary>
/// Gets the saga version.
/// </summary>
ulong Version { get; }
/// <summary>
/// Apply an event that causes the saga to transition.
/// </summary>
/// <param name="message"></param>
void Apply(object message);
ICollection GetUncommittedEvents();
void ClearUncommittedEvents();
ICollection GetUndispatchedMessages();
void ClearUndispatchedMessages();
}
/// <summary>
/// joliver's Event Store backing of sagas!
/// </summary>
/// <typeparam name="TSaga">The type of saga.</typeparam>
public class EventStoreRepository<TSaga> :
ISagaRepository<TSaga>
where TSaga : class, ISaga, IEventSourcedSaga, new()
{
static readonly ILog _log = LogManager.GetLogger(typeof (InMemorySagaRepository<TSaga>));
readonly IStoreEvents _eventStore;
public EventStoreRepository([NotNull] IStoreEvents eventStore)
{
if (eventStore == null) throw new ArgumentNullException("eventStore");
_eventStore = eventStore;
}
IEnumerable<Action<IConsumeContext<TMessage>>> ISagaRepository<TSaga>.GetSaga<TMessage>(
IConsumeContext<TMessage> context, Guid sagaId,
InstanceHandlerSelector<TSaga, TMessage> selector,
ISagaPolicy<TSaga, TMessage> policy)
{
TSaga instance;
IEventStream eventStream;
try
{
eventStream = _eventStore.OpenStream(sagaId, 0, int.MaxValue);
instance = new TSaga();
foreach (var @event in eventStream.CommittedEvents.Select(x => x.Body))
instance.Apply(@event);
}
catch (StreamNotFoundException ex)
{
_log.Info("could not find saga", ex);
instance = null;
eventStream = _eventStore.CreateStream(sagaId);
}
if (instance == null)
{
if (policy.CanCreateInstance(context))
{
yield return x =>
{
if (_log.IsDebugEnabled)
_log.DebugFormat("SAGA: {0} Creating New {1} for {2}", typeof (TSaga).ToFriendlyName(), sagaId,
typeof (TMessage).ToFriendlyName());
try
{
instance = policy.CreateInstance(x, sagaId);
foreach (var callback in selector(instance, x))
{
callback(x);
}
if (!policy.CanRemoveInstance(instance))
Save(instance, eventStream, new Guid(x.RequestId));
}
catch (Exception ex)
{
var sex = new SagaException("Create Saga Instance Exception", typeof (TSaga), typeof (TMessage), sagaId, ex);
if (_log.IsErrorEnabled)
_log.Error(sex);
throw sex;
}
};
}
else
{
if (_log.IsDebugEnabled)
_log.DebugFormat("SAGA: {0} Ignoring Missing {1} for {2}", typeof(TSaga).ToFriendlyName(), sagaId,
typeof(TMessage).ToFriendlyName());
}
}
else
{
if (policy.CanUseExistingInstance(context))
{
yield return x =>
{
if (_log.IsDebugEnabled)
_log.DebugFormat("SAGA: {0} Using Existing {1} for {2}", typeof(TSaga).ToFriendlyName(), sagaId,
typeof(TMessage).ToFriendlyName());
try
{
foreach (var callback in selector(instance, x))
{
callback(x);
}
if (policy.CanRemoveInstance(instance)) ;
// no need to work
}
catch (Exception ex)
{
var sex = new SagaException("Existing Saga Instance Exception", typeof(TSaga), typeof(TMessage), sagaId, ex);
if (_log.IsErrorEnabled)
_log.Error(sex);
throw sex;
}
};
}
else
{
if (_log.IsDebugEnabled)
_log.DebugFormat("SAGA: {0} Ignoring Existing {1} for {2}", typeof(TSaga).ToFriendlyName(), sagaId,
typeof(TMessage).ToFriendlyName());
}
}
}
void Save(TSaga instance, IEventStream sagaStream, Guid commitId)
{
var stream = this.PrepareStream(instance, new Dictionary<string, object>(), sagaStream);
Persist(stream, commitId);
instance.ClearUncommittedEvents();
instance.ClearUndispatchedMessages();
}
private IEventStream PrepareStream(TSaga saga, Dictionary<string, object> headers, IEventStream stream)
{
foreach (var item in headers)
stream.UncommittedHeaders[item.Key] = item.Value;
saga.GetUncommittedEvents()
.Cast<object>()
.Select(x => new EventMessage { Body = x })
.ToList()
.ForEach(stream.Add);
return stream;
}
private static void Persist(IEventStream stream, Guid commitId)
{
try
{
stream.CommitChanges(commitId);
}
catch (DuplicateCommitException)
{
stream.ClearChanges();
}
catch (StorageException e)
{
throw new SagaException(e.Message, typeof(TSaga), typeof(object));
}
}
IEnumerable<Guid> ISagaRepository<TSaga>.Find(ISagaFilter<TSaga> filter)
{
throw new NotImplementedException();
}
IEnumerable<TSaga> ISagaRepository<TSaga>.Where(ISagaFilter<TSaga> filter)
{
throw new NotImplementedException();
}
IEnumerable<TResult> ISagaRepository<TSaga>.Where<TResult>(ISagaFilter<TSaga> filter, Func<TSaga, TResult> transformer)
{
throw new NotImplementedException();
}
IEnumerable<TResult> ISagaRepository<TSaga>.Select<TResult>(Func<TSaga, TResult> transformer)
{
throw new NotImplementedException();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment