Last active June 11, 2021 07:48
Extending the code-first strongly typed pipelines to show how they might be loaded from config
using System;
namespace StronglyTypedPipelines
/// <summary>
/// A base interface required so that reflection code can create a Step from its type name,
/// without needing to understand its generic parameters first.
/// </summary>
public interface IPipelineStep
/// <summary>
/// Base type for individual pipeline steps.
/// Descendants of this type map an input value to an output value.
/// The input and output types can differ.
/// </summary>
public interface IPipelineStep<INPUT, OUTPUT> : IPipelineStep
OUTPUT Process(INPUT input);
/// <summary>
/// An extension method for combining PipelineSteps together into a data flow.
/// </summary>
public static class PipelineStepExtensions
public static OUTPUT Step<INPUT, OUTPUT>(this INPUT input, IPipelineStep<INPUT, OUTPUT> step)
return step.Process(input);
/// <summary>
/// The base type for a complete pipeline.
/// Descendant types can use their constructor to compile a set of PipelineSteps together
/// the PipelineStepExtensions.Step() method, and assign this to the PipelineSteps property here.
/// The initial and final types of the set of steps must match the input and output types of this class,
/// but the intermediate types can vary.
/// </summary>
public abstract class Pipeline<INPUT, OUTPUT> : IPipelineStep<INPUT, OUTPUT>
public Func<INPUT, OUTPUT> PipelineSteps { get; protected set; }
public OUTPUT Process(INPUT input)
return PipelineSteps(input);
namespace StronglyTypedPipelines
public class BasicPipeline : Pipeline<int, float>
public BasicPipeline()
PipelineSteps = input => input
.Step(new DoubleStep())
.Step(new ToStringStep())
.Step(new DuplicateStep())
.Step(new ToFloatStep());
public class InnerPipeline : Pipeline<int, int>
public InnerPipeline()
PipelineSteps = input => input
.Step(new ThirdStep())
.Step(new RoundStep());
public class NestedPipeline : Pipeline<int, string>
public NestedPipeline()
PipelineSteps = input => input
.Step(new DoubleStep())
.Step(new InnerPipeline())
.Step(new ToStringStep());
public class BranchingPipeline : Pipeline<int, string>
public BranchingPipeline()
PipelineSteps = input => input
.Step(new OptionalStep<int, int>(
f => f > 50,
new InnerPipeline()
.Step(new ChoiceStep<int, int>(
f => f > 100,
new HalfStep(),
new DoubleStep()
.Step(new ToStringStep());
namespace StronglyTypedPipelines
public class DoubleStep : IPipelineStep<int, int>
public int Process(int input)
return input * 2;
public class HalfStep : IPipelineStep<int,int>
public int Process(int input)
return input / 2;
public class ThirdStep : IPipelineStep<int, float>
public float Process(int input)
return input / 3f;
public class RoundStep : IPipelineStep<float, int>
public int Process(float input)
return (int)input;
public class ToStringStep : IPipelineStep<int, string>
public string Process(int input)
return input.ToString();
public class DuplicateStep : IPipelineStep<string, string>
public string Process(string input)
return input + "." + input;
public class ToFloatStep : IPipelineStep<string, float>
public float Process(string input)
return float.Parse(input);
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Xml.Linq;
namespace StronglyTypedPipelines
public abstract class ConfigBasedPipeline<INPUT, OUTPUT> : Pipeline<INPUT, OUTPUT>
public ConfigBasedPipeline(XElement pipelineXml)
if (pipelineXml == null)
throw new ArgumentNullException(nameof(pipelineXml));
var pipelineSteps = parsePipelineSteps(pipelineXml);
PipelineSteps = input => processPipelineSteps(pipelineSteps, input);
private OUTPUT processPipelineSteps(IList<IPipelineStep> pipelineSteps, INPUT input)
object output = input;
foreach (IPipelineStep step in pipelineSteps)
MethodInfo mi = step.GetType().GetMethod("Process", BindingFlags.Public | BindingFlags.Instance);
output = mi.Invoke(step, new[] { output });
return (OUTPUT)output;
private IList<IPipelineStep> parsePipelineSteps(XElement pipelineXml)
var pipeline = new List<IPipelineStep>();
foreach (var xStep in pipelineXml.Elements("step"))
string typeName = xStep.Attribute("type").Value;
var type = Type.GetType(typeName);
var ctr = type.GetConstructor(Type.EmptyTypes);
var obj = (IPipelineStep)ctr.Invoke(Type.EmptyTypes);
return pipeline;
private void validatePipelineSteps(IList<IPipelineStep> pipelineSteps)
int stepNumber = 0;
Type pipelineBaseInterface = this.GetType().GetInterface("IPipelineStep`2");
Type currentInputType = pipelineBaseInterface.GenericTypeArguments[0];
Type outputType = pipelineBaseInterface.GenericTypeArguments[1];
foreach (var step in pipelineSteps)
stepNumber += 1;
Type stepBaseInterface = step.GetType().GetInterface("IPipelineStep`2");
Type stepInType = stepBaseInterface.GenericTypeArguments[0];
Type stepOutType = stepBaseInterface.GenericTypeArguments[1];
if (currentInputType != stepInType)
string msg = "Step #{0} {1} input type {2} does not match current type {3}.";
throw new InvalidOperationException(string.Format(msg, stepNumber, step.GetType().Name, stepInType.Name, currentInputType.Name));
currentInputType = stepOutType;
if (currentInputType != outputType)
string msg = "Final step #{0} {1} output type {2} does not equal output of pipeline {3}.";
throw new InvalidOperationException(string.Format(msg, stepNumber, pipelineSteps.Last().GetType().Name, currentInputType.Name, outputType.Name));
public class ExampleConfigBasedPipeline : ConfigBasedPipeline<int, string>
public ExampleConfigBasedPipeline(XElement pipelineXml) : base(pipelineXml)
<?xml version="1.0" encoding="utf-8" ?>
<pipeline name="example">
<step type="StronglyTypedPipelines.DoubleStep, StronglyTypedPipelines" />
<!-- Will error: <step type="StronglyTypedPipelines.ThirdStep, StronglyTypedPipelines"/> -->
<step type="StronglyTypedPipelines.ToStringStep, StronglyTypedPipelines" />
<step type="StronglyTypedPipelines.DuplicateStep, StronglyTypedPipelines" />
<!-- Will error: <step type="StronglyTypedPipelines.ToFloatStep, StronglyTypedPipelines"/> -->
using System;
namespace StronglyTypedPipelines
public class OptionalStep<INPUT, OUTPUT> : IPipelineStep<INPUT, OUTPUT> where INPUT : OUTPUT
private IPipelineStep<INPUT, OUTPUT> _step;
private Func<INPUT, bool> _choice;
public OptionalStep(Func<INPUT, bool> choice, IPipelineStep<INPUT, OUTPUT> step)
_choice = choice;
_step = step;
public OUTPUT Process(INPUT input)
if (_choice(input))
return _step.Process(input);
return input;
public class ChoiceStep<INPUT, OUTPUT> : IPipelineStep<INPUT, OUTPUT> where INPUT : OUTPUT
private IPipelineStep<INPUT, OUTPUT> _first;
private IPipelineStep<INPUT, OUTPUT> _second;
private Func<INPUT, bool> _choice;
public ChoiceStep(Func<INPUT, bool> choice, IPipelineStep<INPUT, OUTPUT> first, IPipelineStep<INPUT, OUTPUT> second)
_choice = choice;
_first = first;
_second = second;
public OUTPUT Process(INPUT input)
if (_choice(input))
return _first.Process(input);
return _second.Process(input);
using System;
using System.Collections.Generic;
using System.Linq.Expressions;
using System.Xml.Linq;
namespace StronglyTypedPipelines
class Program
private static void BasicPipelineTest()
Console.WriteLine("Basic Pipeline Test");
var input = 49;
Console.WriteLine(string.Format("Input Value: {0} [{1}]", input, input.GetType().Name));
var pipeline = new BasicPipeline();
var output = pipeline.Process(input);
Console.WriteLine(string.Format("Output Value: {0} [{1}]", output, output.GetType().Name));
private static void NestedPipelineTest()
Console.WriteLine("Nested Pipeline Test");
var input = 103;
Console.WriteLine(string.Format("Input Value: {0} [{1}]", input, input.GetType().Name));
var pipeline = new NestedPipeline();
var output = pipeline.Process(input);
Console.WriteLine(string.Format("Output Value: {0} [{1}]", output, output.GetType().Name));
private static void BranchingPipelineTest()
Console.WriteLine("Branching Pipeline Test");
foreach(int input in new int[] { 1, 10, 100, 1000 })
Console.WriteLine(string.Format("Input Value: {0} [{1}]", input, input.GetType().Name));
var pipeline = new BranchingPipeline();
var output = pipeline.Process(input);
Console.WriteLine(string.Format("Output Value: {0} [{1}]", output, output.GetType().Name));
private static void ModifiablePipelineTest()
Console.WriteLine("Configured Pipeline Test");
var input = 13;
Console.WriteLine(string.Format("Input Value: {0} [{1}]", input, input.GetType().Name));
XDocument xd = XDocument.Load("ConfigBasedPipeline.xml");
// Patching the configuration data would go here
XElement pipelineXml = xd.Document.Element("pipeline");
var pipeline = new ExampleConfigBasedPipeline(pipelineXml);
var output = pipeline.Process(input);
Console.WriteLine(string.Format("Output Value: {0} [{1}]", output, output.GetType().Name));
catch(Exception ex)
Console.WriteLine("ERROR: " + ex.Message);
static void Main(string[] args)
Yeah totally agree the behaviour of async can be confusing. There are good books about this stuff though, if you want to learn a bit more - "C# In Depth" by Jon Skeet covers useful detail about the hows & whys of async. And it also covers lots of other helpful stuff for people new to the C# language overall. Maybe work will let you put a copy on expenses for learning?

Glad you're getting on top of all this though - happy have been able to help. Certainly not going to waste all this interesting material - what I've worked out will end up on my blog and I'll add new gists for the changed code as well.

