Created
February 12, 2016 08:40
-
-
Save gerektoolhy/61918fc63b67ced322bf to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Modelled similarly to IOwinContext | |
public interface IBusContext | |
{ | |
IDictionary<string, object> Environment { get; } | |
T Get<T>(string key); | |
IBusContext Set<T>(string key, T value); | |
} | |
// Middleware for bus pipeline. | |
// Example middlewares: | |
// UseDIContainerMiddleware - Autofac, Ninject, TinyIOC, etc, creates lifetime scope per bus message | |
// Session per request middleware | |
// CorrelationId middleware | |
// Authentication middleware | |
// Metrics and health middlewares.. | |
public interface IBusMiddleware | |
{ | |
void Init(Func<IBusContext, bool> nextMiddleware); | |
bool Handle(IBusContext context); | |
} | |
public abstract class BaseBusMiddleware : IBusMiddleware | |
{ | |
private Func<IBusContext, bool> next; | |
public void Init(Func<IBusContext, bool> nextMiddleware) | |
{ | |
next = nextMiddleware; | |
} | |
public virtual bool Next(IBusContext context) | |
{ | |
return next == null || next(context); | |
} | |
public abstract bool Handle(IBusContext context); | |
} | |
public class BusAutofacMiddleware : BaseBusMiddleware | |
{ | |
private readonly IContainer container; | |
public BusAutofacMiddleware(IContainer container) | |
{ | |
if (container == null) | |
throw new ArgumentNullException("container"); | |
this.container = container; | |
} | |
public override bool Handle(IBusContext context) | |
{ | |
if (context == null) | |
throw new ArgumentNullException("context"); | |
using (var lifetimeScope = container.BeginLifetimeScope( | |
BusLifetimeTags.ScopePerBusMessageTag)) | |
{ | |
context.Set(BusLifetimeTags.ContextTag, lifetimeScope); | |
return Next(context); | |
} | |
} | |
} | |
public class BusCorrelationContextMiddleware : BaseBusMiddleware | |
{ | |
public override bool Handle(IBusContext context) | |
{ | |
if (!(context.GetMessage<object>() is IMessageHeaders)) | |
throw new ArgumentNullException("context", "message does not exist in context"); | |
// Extract correlaction Id from bus message headers, or generate new | |
var correlationId = Guid.NewGuid().ToString(); | |
var busMessage = context.GetMessage<object>() as IMessageHeaders; | |
if (busMessage.Headers.ContainsKey(AddCorrelationIdToOutgoingHeaders.CorrelationIdHeaderKey)) | |
correlationId = busMessage.Headers[AddCorrelationIdToOutgoingHeaders.CorrelationIdHeaderKey]; | |
// Persist | |
LogicalThreadContext.Properties[LoggingContextKeys.CorrelationIdKey] = correlationId; | |
return Next(context); | |
} | |
} | |
// Possible examples of message mutators - bus correlation id mutator, | |
// CustomAuthenticationScheme header - adds authentication headers, if required, | |
// etc | |
public interface IOutgoingMessageMutator | |
{ | |
IMessageHeaders Mutate(IMessageHeaders message); | |
} | |
public interface IMessageHeaders | |
{ | |
Guid Id { get; set; } | |
IDictionary<string, string> Headers { get; set; } | |
} | |
public class BusMessage : Message, IMessageHeaders | |
{ | |
public IDictionary<string, string> Headers { get; set; } | |
public BusMessage() | |
{ | |
Headers = new Dictionary<string, string>(); | |
} | |
} | |
/// <summary> | |
/// A wrapper on top of 3rd party bus publisher (JustSaying). | |
/// | |
/// Adds extra functionality such as message mutators | |
/// </summary> | |
public interface IBusPublisher | |
{ | |
void Publish(Message message); | |
} | |
public class BusPublisher : IBusPublisher | |
{ | |
private readonly IEnumerable<IOutgoingMessageMutator> mutators; | |
// Pass in just eat publisher) | |
public BusPublisher(IEnumerable<IOutgoingMessageMutator> mutators, IJustEatPublisher justEatPublisher) | |
{} | |
public void Publish(Message message) | |
{ | |
message = mutators.Aggregate(message, (current, m) => (Message)m.Mutate(current as IMessageHeaders)); | |
justEatPublisher.Publish(message); | |
} | |
} | |
// Adds correlation Id to message headers if not already set | |
public class AddCorrelationIdToOutgoingHeaders : IOutgoingMessageMutator | |
{ | |
public const string CorrelationIdHeaderKey = "X-CorrelationId"; | |
const string CorrelationIdLoggingKey = "correlationId"; | |
public IMessageHeaders Mutate(IMessageHeaders message) | |
{ | |
if (message.Headers.ContainsKey(CorrelationIdHeaderKey)) | |
return message; | |
var correlationId = LogicalThreadContext.Properties[CorrelationIdLoggingKey]; | |
if (correlationId != null) | |
message.Headers.Add(CorrelationIdHeaderKey, correlationId.ToString()); | |
return message; | |
} | |
} | |
// A trimmed down version of bootstrapping | |
public class BusStartup | |
{ | |
public void StartListening() | |
{ | |
Metric.Config | |
.WithBus(handlerResolver) | |
.Use(ctx => new ErrorMeterMiddleware(ctx, "Errors")) | |
.Use(ctx => new ActiveMessageHandlerCounterMiddleware(ctx, "ActiveMessageHandlers")) | |
.Use(ctx => new MessageHandlerTimerMiddleware(ctx, "MessageHandlerDuration")) | |
.Use(ctx => new TimerForEachMessageHandlerMiddleware(ctx)); | |
handlerResolver | |
.Use(() => new BusCorrelationContextMiddleware()) | |
.Use(() => new BusAutofacMiddleware(IoC.Container)) | |
.Use(() => new BusSessionPerMessageMiddleware()) | |
.Use(() => new BusAuthenticationMiddleware()) | |
.Use(() => new BusClaimsPrincipalMiddleware()) | |
; | |
justSaying = CreateMeABus | |
.InRegion(awsSettings.Regions) | |
.WithSerialisationFactory(..) | |
.WithNamingStrategy(() => busNamingConvention); | |
busConfigurator.RegisterHandlers(justSaying); | |
justSaying.StartListening(); | |
} | |
} | |
} | |
// A JustSaying bus handler resolver, hooks up middlewares | |
public class BusHandlerResolver : IHandlerResolver | |
{ | |
private readonly IContainer container; | |
private readonly List<Func<IBusMiddleware>> middlewares = new List<Func<IBusMiddleware>>(); | |
public BusHandlerResolver(IContainer container) | |
{ | |
this.container = container; | |
} | |
public BusHandlerResolver Use(Func<IBusMiddleware> middleware) | |
{ | |
this.middlewares.Add(middleware); | |
return this; | |
} | |
public IEnumerable<IHandler<T>> ResolveHandlers<T>() | |
{ | |
return new IHandler<T>[] { new BusUnitOfWorkHandler<T>(container, middlewares, s => s.Resolve<IMessageHandler<T>>()) }; | |
} | |
} | |
// JustSaying bus message handler supporting middlewares, | |
public class BusUnitOfWorkHandler<T> : IHandler<T> | |
{ | |
private static readonly ILog logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); | |
private readonly IContainer container; | |
private readonly List<Func<IBusMiddleware>> busMiddleware; | |
private List<IBusMiddleware> mid; | |
private readonly Func<ILifetimeScope, IMessageHandler<T>> finalHandlerFunc; | |
// busMiddleware - a list of ordered middlewares | |
// finalHandlerFunc - the final handler that will process the message | |
public BusUnitOfWorkHandler(IContainer container, List<Func<IBusMiddleware>> busMiddleware, Func<ILifetimeScope, IMessageHandler<T>> finalHandlerFunc) | |
{ | |
this.container = container; | |
this.busMiddleware = busMiddleware; | |
this.finalHandlerFunc = finalHandlerFunc; | |
} | |
/// <summary> | |
/// Sets-up middleware chain | |
/// </summary> | |
private void Init() | |
{ | |
mid = busMiddleware.Select(y => y()).ToList(); | |
var current = mid[0]; | |
foreach (var next in mid) | |
{ | |
if (current == next) | |
continue; | |
var next1 = next; | |
current.Init(c => next1.Handle(c)); | |
current = next; | |
} | |
} | |
public bool Handle(T message) | |
{ | |
var msg = message as Message; | |
try | |
{ | |
var context = new BusContext(container); | |
context.Set(BusContextAutofacExtensionsMethods.BusMessageTag, message); | |
Init(); | |
// set message handler as the last step | |
mid[mid.Count - 1].Init(c => ResolveFinalHandler(context).Handle(c, message)); | |
var result = mid[0].Handle(context); | |
return result; | |
} | |
catch (Exception e) | |
{ | |
return false; | |
} | |
} | |
// Resolves final handler from Autofac container | |
private IMessageHandler<T> ResolveFinalHandler(IBusContext context) | |
{ | |
return finalHandlerFunc(context.GetBusLifetimeScope()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment