Skip to content

Instantly share code, notes, and snippets.

@ralfw
Last active January 5, 2021 22:01
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ralfw/6420924 to your computer and use it in GitHub Desktop.
Save ralfw/6420924 to your computer and use it in GitHub Desktop.
Pipes and filters à la Steve Bate - but with a twist :-) Not just classes are filters, but also objects and functions.
public class PipelineOf<T> : IFilterOf<T>
{
public class Result
{
public static Result Success(T value)
{
return new Result {IsSuccess = true, Value = value};
}
public static Result Failure(Exception exception)
{
return new Result {HasFailed = true, Exception = exception};
}
public bool IsSuccess { get; private set; }
public bool HasFailed { get; private set; }
public T Value { get; private set; }
public Exception Exception { get; private set; }
}
private class Catcher
{
public PipelineOf<Exception> CatchPipeline;
}
private readonly IList<Func<T, T>> _stages = new List<Func<T, T>>();
private readonly Stack<Catcher> _catchers = new Stack<Catcher>();
public PipelineOf<T> Register(IFilterOf<T> singletonFilter)
{
_stages.Add(singletonFilter.Process);
return this;
}
public PipelineOf<T> Register<F>() where F : IFilterOf<T>, new()
{
_stages.Add(msg => {
var f = new F();
return f.Process(msg);
});
return this;
}
public PipelineOf<T> Register(Func<T, T> filter)
{
_stages.Add(filter);
return this;
}
public PipelineOf<T> Try()
{
var catcher = new Catcher();
_catchers.Push(catcher);
Register(msg => {
_catchers.Push(catcher);
return msg;
});
return this;
}
public PipelineOf<T> Catch(PipelineOf<Exception> catchPipeline)
{
_catchers.Pop().CatchPipeline = catchPipeline;
Register(msg => {
_catchers.Pop();
return msg;
});
return this;
}
public T Process(T message)
{
try
{
return _stages.Aggregate(message, (current, stage) => stage(current));
}
catch (Exception ex)
{
if (_catchers.Count > 0)
{
_catchers.Peek().CatchPipeline.Process(ex);
throw new WarningException("Exception handled by catch filter.", ex);
}
else
{
throw new ApplicationException("No catch filter registered!", ex);
}
};
}
public Result ProcessSafely(T message)
{
try
{
return Result.Success(Process(message));
}
catch (WarningException ex)
{
return Result.Failure(ex);
}
}
}
public interface IFilterOf<T>
{
T Process(T message);
}
var p = new PipelineOf<string>();
p.Register(msg => msg + "Hello")
.Register(msg => msg + ", ")
.Register(msg => msg + "world!");
var r = p.ProcessSafely("the message is: ");
Console.WriteLine("result({0}, '{1}')", r.IsSuccess, r.Value);
p = new PipelineOf<string>();
p.Try()
.Register(msg => msg + "Hello")
.Register(msg => msg + ", ")
.Register(msg => { throw new ApplicationException("argh!!!"); })
.Register(msg => msg + "world!")
.Catch(new PipelineOf<Exception>().Register(ex => { Console.WriteLine("*** {0}", ex.Message); return ex; }));
r = p.ProcessSafely("the message is: ");
Console.WriteLine("result({0}, '{1}')", r.HasFailed, r.Exception.Message);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment