Last active
August 29, 2015 14:12
-
-
Save SteveBate/5db6081bc67354877360 to your computer and use it in GitHub Desktop.
A version of the pipe and filters task but with the ability to also specify middleware (aspects) at the same time.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
void Main() | |
{ | |
// create a pipeline task | |
var mytask = new Task<OrderMatrixMessage>(); | |
// register aspects | |
mytask.Wrap(new LoggingAspect<OrderMatrixMessage>()); | |
mytask.Wrap(new ExceptionLoggingAspect<OrderMatrixMessage>()); | |
mytask.Wrap(new AsyncAspect<OrderMatrixMessage>()); | |
// register filters | |
mytask.Register(new Step1()); | |
mytask.Register(new Step2()); | |
mytask.Register(new Step3()); | |
mytask.Register(new Step4()); | |
// create a message. Change the PretendAnErrorHappened value to true to see the message handler kick in. | |
var msg = new OrderMatrixMessage { PinNo = "BB124", PretendAnErrorHappened = true }; | |
msg.OnStart = () => { Console.WriteLine("OnStart"); }; | |
msg.OnDone = () => { Console.WriteLine("OnDone"); }; | |
msg.OnSuccess = () => { Console.WriteLine("OnSuccess"); }; | |
msg.OnError = (e) => { Console.WriteLine("OnError"); }; | |
// run it | |
mytask.Invoke(msg); | |
} | |
// our message | |
public class OrderMatrixMessage : BaseMessage | |
{ | |
public string PinNo { get; set; } | |
public bool PretendAnErrorHappened { get; set; } | |
public override string ToString() | |
{ | |
return string.Format("PinNo: {0}", PinNo); | |
} | |
} | |
// simple logging aspect | |
public class LoggingAspect<T> : Handler<T> | |
{ | |
public void Handle(T msg) | |
{ | |
Console.WriteLine(string.Format("Logging Message Before: {0}", msg)); | |
Inner.Handle(msg); | |
Console.WriteLine(string.Format("Logging Message After: {0}", msg)); | |
} | |
public Handler<T> Inner { get; set; } | |
} | |
// exception logging aspect | |
public class ExceptionLoggingAspect<T> : Handler<T> | |
{ | |
public void Handle(T msg) | |
{ | |
try | |
{ | |
Inner.Handle(msg); | |
} | |
catch(Exception ex) | |
{ | |
Console.WriteLine("Logging error: {0}", ex); | |
} | |
} | |
public Handler<T> Inner { get; set; } | |
} | |
// An aspect to invoke the filters on a background thread | |
public class AsyncAspect<T> : Handler<T> | |
{ | |
public void Handle(T msg) | |
{ | |
var bw = new BackgroundWorker(); | |
bw.DoWork += (o, e) => Inner.Handle(msg); | |
bw.RunWorkerCompleted += (o, e) => bw.Dispose(); | |
bw.RunWorkerAsync(); | |
} | |
public Handler<T> Inner { get; set; } | |
} | |
// filters - nothing unusual here | |
public class Step1 : Filter<OrderMatrixMessage> | |
{ | |
public void Execute(OrderMatrixMessage input) | |
{ | |
Console.WriteLine("Step1"); | |
} | |
} | |
public class Step2 : Filter<OrderMatrixMessage> | |
{ | |
public void Execute(OrderMatrixMessage input) | |
{ | |
Console.WriteLine("Step2"); | |
} | |
} | |
public class Step3 : Filter<OrderMatrixMessage> | |
{ | |
public void Execute(OrderMatrixMessage input) | |
{ | |
if(input.PretendAnErrorHappened) | |
{ | |
input.Stop = true; | |
throw new Exception("oh oh! Something went wrong!"); | |
} | |
Console.WriteLine("Step3"); | |
} | |
} | |
public class Step4 : Filter<OrderMatrixMessage> | |
{ | |
public void Execute(OrderMatrixMessage input) | |
{ | |
Console.WriteLine("Step4"); | |
} | |
} | |
public interface Filter<T> | |
{ | |
void Execute(T msg); | |
} | |
public interface Handler<T> | |
{ | |
void Handle(T msg); | |
Handler<T> Inner { get; set; } | |
} | |
public abstract class BaseMessage | |
{ | |
public bool Stop { get; set; } | |
public Action<String> OnError { get; set; } | |
public Action OnSuccess { get; set; } | |
public Action OnStart { get; set; } | |
public Action OnDone { get; set; } | |
} | |
public class Task<T> where T : BaseMessage | |
{ | |
public Task() | |
{ | |
// initialise both the handler and pipeline to a new instance of the Pipeline class | |
// setting handler to this ensures Invoke will always execute correctly even if no aspects are registered | |
this.handler = this.pipeline = new Pipeline<T>(); | |
} | |
// Wrap takes a Handler<T> which all aspects must implement. The logic here wires each handler to the previous to set up a chain | |
public void Wrap(Handler<T> h) | |
{ | |
h.Inner = this.handler; | |
this.handler = h; | |
} | |
// Register just passes on the filter to the inner pipeline | |
public void Register(Filter<T> filter) | |
{ | |
pipeline.Add(filter); | |
} | |
// Register many in one call | |
public void Register(params Filter<T>[] filters) | |
{ | |
foreach (var filter in filters) | |
{ | |
pipeline.Add(filter); | |
} | |
} | |
// starting with the first aspect handler (if any) start the pipeline executing | |
public void Invoke(T msg) | |
{ | |
this.handler.Handle(msg); | |
} | |
Handler<T> handler; | |
Pipeline<T> pipeline; | |
// private implementation | |
class Pipeline<T1> : Handler<T1> where T1 : BaseMessage | |
{ | |
public void Add(Filter<T1> filter) | |
{ | |
filters.Add(filter); | |
} | |
public void Handle(T1 msg) | |
{ | |
msg.OnStart(); | |
try | |
{ | |
foreach (var f in filters) | |
{ | |
// we have access to the Stop property within the pipeline because all messages must inherit from BaseMessage via the where T1 constraint above | |
if (msg.Stop) | |
return; | |
f.Execute(msg); | |
} | |
if(msg.OnSuccess != null) | |
msg.OnSuccess(); | |
} | |
catch (Exception ex) | |
{ | |
if (msg.OnError != null) | |
msg.OnError(ex.Message); | |
throw; | |
} | |
finally | |
{ | |
if(msg.OnDone != null) | |
msg.OnDone(); | |
} | |
} | |
public Handler<T1> Inner { get; set; } | |
List<Filter<T1>> filters = new List<Filter<T1>>(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment