Skip to content

Instantly share code, notes, and snippets.

@SteveBate
Last active August 29, 2015 14:12
Show Gist options
  • Save SteveBate/5db6081bc67354877360 to your computer and use it in GitHub Desktop.
Save SteveBate/5db6081bc67354877360 to your computer and use it in GitHub Desktop.
A version of the pipe and filters task but with the ability to also specify middleware (aspects) at the same time.
void Main()
{
// create a pipeline task
var mytask = new Task<OrderMatrixMessage>();
// register aspects
mytask.Wrap(new LoggingAspect<OrderMatrixMessage>());
mytask.Wrap(new ExceptionLoggingAspect<OrderMatrixMessage>());
mytask.Wrap(new AsyncAspect<OrderMatrixMessage>());
// register filters
mytask.Register(new Step1());
mytask.Register(new Step2());
mytask.Register(new Step3());
mytask.Register(new Step4());
// create a message. Change the PretendAnErrorHappened value to true to see the message handler kick in.
var msg = new OrderMatrixMessage { PinNo = "BB124", PretendAnErrorHappened = true };
msg.OnStart = () => { Console.WriteLine("OnStart"); };
msg.OnDone = () => { Console.WriteLine("OnDone"); };
msg.OnSuccess = () => { Console.WriteLine("OnSuccess"); };
msg.OnError = (e) => { Console.WriteLine("OnError"); };
// run it
mytask.Invoke(msg);
}
// our message
public class OrderMatrixMessage : BaseMessage
{
public string PinNo { get; set; }
public bool PretendAnErrorHappened { get; set; }
public override string ToString()
{
return string.Format("PinNo: {0}", PinNo);
}
}
// simple logging aspect
public class LoggingAspect<T> : Handler<T>
{
public void Handle(T msg)
{
Console.WriteLine(string.Format("Logging Message Before: {0}", msg));
Inner.Handle(msg);
Console.WriteLine(string.Format("Logging Message After: {0}", msg));
}
public Handler<T> Inner { get; set; }
}
// exception logging aspect
public class ExceptionLoggingAspect<T> : Handler<T>
{
public void Handle(T msg)
{
try
{
Inner.Handle(msg);
}
catch(Exception ex)
{
Console.WriteLine("Logging error: {0}", ex);
}
}
public Handler<T> Inner { get; set; }
}
// An aspect to invoke the filters on a background thread
public class AsyncAspect<T> : Handler<T>
{
public void Handle(T msg)
{
var bw = new BackgroundWorker();
bw.DoWork += (o, e) => Inner.Handle(msg);
bw.RunWorkerCompleted += (o, e) => bw.Dispose();
bw.RunWorkerAsync();
}
public Handler<T> Inner { get; set; }
}
// filters - nothing unusual here
public class Step1 : Filter<OrderMatrixMessage>
{
public void Execute(OrderMatrixMessage input)
{
Console.WriteLine("Step1");
}
}
public class Step2 : Filter<OrderMatrixMessage>
{
public void Execute(OrderMatrixMessage input)
{
Console.WriteLine("Step2");
}
}
public class Step3 : Filter<OrderMatrixMessage>
{
public void Execute(OrderMatrixMessage input)
{
if(input.PretendAnErrorHappened)
{
input.Stop = true;
throw new Exception("oh oh! Something went wrong!");
}
Console.WriteLine("Step3");
}
}
public class Step4 : Filter<OrderMatrixMessage>
{
public void Execute(OrderMatrixMessage input)
{
Console.WriteLine("Step4");
}
}
public interface Filter<T>
{
void Execute(T msg);
}
public interface Handler<T>
{
void Handle(T msg);
Handler<T> Inner { get; set; }
}
public abstract class BaseMessage
{
public bool Stop { get; set; }
public Action<String> OnError { get; set; }
public Action OnSuccess { get; set; }
public Action OnStart { get; set; }
public Action OnDone { get; set; }
}
public class Task<T> where T : BaseMessage
{
public Task()
{
// initialise both the handler and pipeline to a new instance of the Pipeline class
// setting handler to this ensures Invoke will always execute correctly even if no aspects are registered
this.handler = this.pipeline = new Pipeline<T>();
}
// Wrap takes a Handler<T> which all aspects must implement. The logic here wires each handler to the previous to set up a chain
public void Wrap(Handler<T> h)
{
h.Inner = this.handler;
this.handler = h;
}
// Register just passes on the filter to the inner pipeline
public void Register(Filter<T> filter)
{
pipeline.Add(filter);
}
// Register many in one call
public void Register(params Filter<T>[] filters)
{
foreach (var filter in filters)
{
pipeline.Add(filter);
}
}
// starting with the first aspect handler (if any) start the pipeline executing
public void Invoke(T msg)
{
this.handler.Handle(msg);
}
Handler<T> handler;
Pipeline<T> pipeline;
// private implementation
class Pipeline<T1> : Handler<T1> where T1 : BaseMessage
{
public void Add(Filter<T1> filter)
{
filters.Add(filter);
}
public void Handle(T1 msg)
{
msg.OnStart();
try
{
foreach (var f in filters)
{
// we have access to the Stop property within the pipeline because all messages must inherit from BaseMessage via the where T1 constraint above
if (msg.Stop)
return;
f.Execute(msg);
}
if(msg.OnSuccess != null)
msg.OnSuccess();
}
catch (Exception ex)
{
if (msg.OnError != null)
msg.OnError(ex.Message);
throw;
}
finally
{
if(msg.OnDone != null)
msg.OnDone();
}
}
public Handler<T1> Inner { get; set; }
List<Filter<T1>> filters = new List<Filter<T1>>();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment