Skip to content

Instantly share code, notes, and snippets.

@gerektoolhy
Created February 12, 2016 08:40
Show Gist options
  • Save gerektoolhy/61918fc63b67ced322bf to your computer and use it in GitHub Desktop.
Save gerektoolhy/61918fc63b67ced322bf to your computer and use it in GitHub Desktop.
// Modelled similarly to IOwinContext
public interface IBusContext
{
IDictionary<string, object> Environment { get; }
T Get<T>(string key);
IBusContext Set<T>(string key, T value);
}
// Middleware for bus pipeline.
// Example middlewares:
// UseDIContainerMiddleware - Autofac, Ninject, TinyIOC, etc, creates lifetime scope per bus message
// Session per request middleware
// CorrelationId middleware
// Authentication middleware
// Metrics and health middlewares..
public interface IBusMiddleware
{
void Init(Func<IBusContext, bool> nextMiddleware);
bool Handle(IBusContext context);
}
public abstract class BaseBusMiddleware : IBusMiddleware
{
private Func<IBusContext, bool> next;
public void Init(Func<IBusContext, bool> nextMiddleware)
{
next = nextMiddleware;
}
public virtual bool Next(IBusContext context)
{
return next == null || next(context);
}
public abstract bool Handle(IBusContext context);
}
public class BusAutofacMiddleware : BaseBusMiddleware
{
private readonly IContainer container;
public BusAutofacMiddleware(IContainer container)
{
if (container == null)
throw new ArgumentNullException("container");
this.container = container;
}
public override bool Handle(IBusContext context)
{
if (context == null)
throw new ArgumentNullException("context");
using (var lifetimeScope = container.BeginLifetimeScope(
BusLifetimeTags.ScopePerBusMessageTag))
{
context.Set(BusLifetimeTags.ContextTag, lifetimeScope);
return Next(context);
}
}
}
public class BusCorrelationContextMiddleware : BaseBusMiddleware
{
public override bool Handle(IBusContext context)
{
if (!(context.GetMessage<object>() is IMessageHeaders))
throw new ArgumentNullException("context", "message does not exist in context");
// Extract correlaction Id from bus message headers, or generate new
var correlationId = Guid.NewGuid().ToString();
var busMessage = context.GetMessage<object>() as IMessageHeaders;
if (busMessage.Headers.ContainsKey(AddCorrelationIdToOutgoingHeaders.CorrelationIdHeaderKey))
correlationId = busMessage.Headers[AddCorrelationIdToOutgoingHeaders.CorrelationIdHeaderKey];
// Persist
LogicalThreadContext.Properties[LoggingContextKeys.CorrelationIdKey] = correlationId;
return Next(context);
}
}
// Possible examples of message mutators - bus correlation id mutator,
// CustomAuthenticationScheme header - adds authentication headers, if required,
// etc
public interface IOutgoingMessageMutator
{
IMessageHeaders Mutate(IMessageHeaders message);
}
public interface IMessageHeaders
{
Guid Id { get; set; }
IDictionary<string, string> Headers { get; set; }
}
public class BusMessage : Message, IMessageHeaders
{
public IDictionary<string, string> Headers { get; set; }
public BusMessage()
{
Headers = new Dictionary<string, string>();
}
}
/// <summary>
/// A wrapper on top of 3rd party bus publisher (JustSaying).
///
/// Adds extra functionality such as message mutators
/// </summary>
public interface IBusPublisher
{
void Publish(Message message);
}
public class BusPublisher : IBusPublisher
{
private readonly IEnumerable<IOutgoingMessageMutator> mutators;
// Pass in just eat publisher)
public BusPublisher(IEnumerable<IOutgoingMessageMutator> mutators, IJustEatPublisher justEatPublisher)
{}
public void Publish(Message message)
{
message = mutators.Aggregate(message, (current, m) => (Message)m.Mutate(current as IMessageHeaders));
justEatPublisher.Publish(message);
}
}
// Adds correlation Id to message headers if not already set
public class AddCorrelationIdToOutgoingHeaders : IOutgoingMessageMutator
{
public const string CorrelationIdHeaderKey = "X-CorrelationId";
const string CorrelationIdLoggingKey = "correlationId";
public IMessageHeaders Mutate(IMessageHeaders message)
{
if (message.Headers.ContainsKey(CorrelationIdHeaderKey))
return message;
var correlationId = LogicalThreadContext.Properties[CorrelationIdLoggingKey];
if (correlationId != null)
message.Headers.Add(CorrelationIdHeaderKey, correlationId.ToString());
return message;
}
}
// A trimmed down version of bootstrapping
public class BusStartup
{
public void StartListening()
{
Metric.Config
.WithBus(handlerResolver)
.Use(ctx => new ErrorMeterMiddleware(ctx, "Errors"))
.Use(ctx => new ActiveMessageHandlerCounterMiddleware(ctx, "ActiveMessageHandlers"))
.Use(ctx => new MessageHandlerTimerMiddleware(ctx, "MessageHandlerDuration"))
.Use(ctx => new TimerForEachMessageHandlerMiddleware(ctx));
handlerResolver
.Use(() => new BusCorrelationContextMiddleware())
.Use(() => new BusAutofacMiddleware(IoC.Container))
.Use(() => new BusSessionPerMessageMiddleware())
.Use(() => new BusAuthenticationMiddleware())
.Use(() => new BusClaimsPrincipalMiddleware())
;
justSaying = CreateMeABus
.InRegion(awsSettings.Regions)
.WithSerialisationFactory(..)
.WithNamingStrategy(() => busNamingConvention);
busConfigurator.RegisterHandlers(justSaying);
justSaying.StartListening();
}
}
}
// A JustSaying bus handler resolver, hooks up middlewares
public class BusHandlerResolver : IHandlerResolver
{
private readonly IContainer container;
private readonly List<Func<IBusMiddleware>> middlewares = new List<Func<IBusMiddleware>>();
public BusHandlerResolver(IContainer container)
{
this.container = container;
}
public BusHandlerResolver Use(Func<IBusMiddleware> middleware)
{
this.middlewares.Add(middleware);
return this;
}
public IEnumerable<IHandler<T>> ResolveHandlers<T>()
{
return new IHandler<T>[] { new BusUnitOfWorkHandler<T>(container, middlewares, s => s.Resolve<IMessageHandler<T>>()) };
}
}
// JustSaying bus message handler supporting middlewares,
public class BusUnitOfWorkHandler<T> : IHandler<T>
{
private static readonly ILog logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
private readonly IContainer container;
private readonly List<Func<IBusMiddleware>> busMiddleware;
private List<IBusMiddleware> mid;
private readonly Func<ILifetimeScope, IMessageHandler<T>> finalHandlerFunc;
// busMiddleware - a list of ordered middlewares
// finalHandlerFunc - the final handler that will process the message
public BusUnitOfWorkHandler(IContainer container, List<Func<IBusMiddleware>> busMiddleware, Func<ILifetimeScope, IMessageHandler<T>> finalHandlerFunc)
{
this.container = container;
this.busMiddleware = busMiddleware;
this.finalHandlerFunc = finalHandlerFunc;
}
/// <summary>
/// Sets-up middleware chain
/// </summary>
private void Init()
{
mid = busMiddleware.Select(y => y()).ToList();
var current = mid[0];
foreach (var next in mid)
{
if (current == next)
continue;
var next1 = next;
current.Init(c => next1.Handle(c));
current = next;
}
}
public bool Handle(T message)
{
var msg = message as Message;
try
{
var context = new BusContext(container);
context.Set(BusContextAutofacExtensionsMethods.BusMessageTag, message);
Init();
// set message handler as the last step
mid[mid.Count - 1].Init(c => ResolveFinalHandler(context).Handle(c, message));
var result = mid[0].Handle(context);
return result;
}
catch (Exception e)
{
return false;
}
}
// Resolves final handler from Autofac container
private IMessageHandler<T> ResolveFinalHandler(IBusContext context)
{
return finalHandlerFunc(context.GetBusLifetimeScope());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment