Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Extending the code-first strongly typed pipelines to show how they might be loaded from config
using System;
namespace StronglyTypedPipelines
{
/// <summary>
/// A base interface required so that reflection code can create a Step from its type name,
/// without needing to understand its generic parameters first.
/// </summary>
public interface IPipelineStep
{
}
/// <summary>
/// Base type for individual pipeline steps.
/// Descendants of this type map an input value to an output value.
/// The input and output types can differ.
/// </summary>
public interface IPipelineStep<INPUT, OUTPUT> : IPipelineStep
{
OUTPUT Process(INPUT input);
}
/// <summary>
/// An extension method for combining PipelineSteps together into a data flow.
/// </summary>
public static class PipelineStepExtensions
{
public static OUTPUT Step<INPUT, OUTPUT>(this INPUT input, IPipelineStep<INPUT, OUTPUT> step)
{
return step.Process(input);
}
}
/// <summary>
/// The base type for a complete pipeline.
/// Descendant types can use their constructor to compile a set of PipelineSteps together
/// the PipelineStepExtensions.Step() method, and assign this to the PipelineSteps property here.
/// The initial and final types of the set of steps must match the input and output types of this class,
/// but the intermediate types can vary.
/// </summary>
public abstract class Pipeline<INPUT, OUTPUT> : IPipelineStep<INPUT, OUTPUT>
{
public Func<INPUT, OUTPUT> PipelineSteps { get; protected set; }
public OUTPUT Process(INPUT input)
{
return PipelineSteps(input);
}
}
}
namespace StronglyTypedPipelines
{
public class BasicPipeline : Pipeline<int, float>
{
public BasicPipeline()
{
PipelineSteps = input => input
.Step(new DoubleStep())
.Step(new ToStringStep())
.Step(new DuplicateStep())
.Step(new ToFloatStep());
}
}
public class InnerPipeline : Pipeline<int, int>
{
public InnerPipeline()
{
PipelineSteps = input => input
.Step(new ThirdStep())
.Step(new RoundStep());
}
}
public class NestedPipeline : Pipeline<int, string>
{
public NestedPipeline()
{
PipelineSteps = input => input
.Step(new DoubleStep())
.Step(new InnerPipeline())
.Step(new ToStringStep());
}
}
public class BranchingPipeline : Pipeline<int, string>
{
public BranchingPipeline()
{
PipelineSteps = input => input
.Step(new OptionalStep<int, int>(
f => f > 50,
new InnerPipeline()
))
.Step(new ChoiceStep<int, int>(
f => f > 100,
new HalfStep(),
new DoubleStep()
))
.Step(new ToStringStep());
}
}
}
namespace StronglyTypedPipelines
{
public class DoubleStep : IPipelineStep<int, int>
{
public int Process(int input)
{
return input * 2;
}
}
public class HalfStep : IPipelineStep<int,int>
{
public int Process(int input)
{
return input / 2;
}
}
public class ThirdStep : IPipelineStep<int, float>
{
public float Process(int input)
{
return input / 3f;
}
}
public class RoundStep : IPipelineStep<float, int>
{
public int Process(float input)
{
return (int)input;
}
}
public class ToStringStep : IPipelineStep<int, string>
{
public string Process(int input)
{
return input.ToString();
}
}
public class DuplicateStep : IPipelineStep<string, string>
{
public string Process(string input)
{
return input + "." + input;
}
}
public class ToFloatStep : IPipelineStep<string, float>
{
public float Process(string input)
{
return float.Parse(input);
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Xml.Linq;
namespace StronglyTypedPipelines
{
public abstract class ConfigBasedPipeline<INPUT, OUTPUT> : Pipeline<INPUT, OUTPUT>
{
public ConfigBasedPipeline(XElement pipelineXml)
{
if (pipelineXml == null)
{
throw new ArgumentNullException(nameof(pipelineXml));
}
var pipelineSteps = parsePipelineSteps(pipelineXml);
validatePipelineSteps(pipelineSteps);
PipelineSteps = input => processPipelineSteps(pipelineSteps, input);
}
private OUTPUT processPipelineSteps(IList<IPipelineStep> pipelineSteps, INPUT input)
{
object output = input;
foreach (IPipelineStep step in pipelineSteps)
{
MethodInfo mi = step.GetType().GetMethod("Process", BindingFlags.Public | BindingFlags.Instance);
output = mi.Invoke(step, new[] { output });
}
return (OUTPUT)output;
}
private IList<IPipelineStep> parsePipelineSteps(XElement pipelineXml)
{
var pipeline = new List<IPipelineStep>();
foreach (var xStep in pipelineXml.Elements("step"))
{
string typeName = xStep.Attribute("type").Value;
var type = Type.GetType(typeName);
var ctr = type.GetConstructor(Type.EmptyTypes);
var obj = (IPipelineStep)ctr.Invoke(Type.EmptyTypes);
pipeline.Add(obj);
}
return pipeline;
}
private void validatePipelineSteps(IList<IPipelineStep> pipelineSteps)
{
int stepNumber = 0;
Type pipelineBaseInterface = this.GetType().GetInterface("IPipelineStep`2");
Type currentInputType = pipelineBaseInterface.GenericTypeArguments[0];
Type outputType = pipelineBaseInterface.GenericTypeArguments[1];
foreach (var step in pipelineSteps)
{
stepNumber += 1;
Type stepBaseInterface = step.GetType().GetInterface("IPipelineStep`2");
Type stepInType = stepBaseInterface.GenericTypeArguments[0];
Type stepOutType = stepBaseInterface.GenericTypeArguments[1];
if (currentInputType != stepInType)
{
string msg = "Step #{0} {1} input type {2} does not match current type {3}.";
throw new InvalidOperationException(string.Format(msg, stepNumber, step.GetType().Name, stepInType.Name, currentInputType.Name));
}
currentInputType = stepOutType;
}
if (currentInputType != outputType)
{
string msg = "Final step #{0} {1} output type {2} does not equal output of pipeline {3}.";
throw new InvalidOperationException(string.Format(msg, stepNumber, pipelineSteps.Last().GetType().Name, currentInputType.Name, outputType.Name));
}
}
}
public class ExampleConfigBasedPipeline : ConfigBasedPipeline<int, string>
{
public ExampleConfigBasedPipeline(XElement pipelineXml) : base(pipelineXml)
{
}
}
}
<?xml version="1.0" encoding="utf-8" ?>
<pipeline name="example">
<step type="StronglyTypedPipelines.DoubleStep, StronglyTypedPipelines" />
<!-- Will error: <step type="StronglyTypedPipelines.ThirdStep, StronglyTypedPipelines"/> -->
<step type="StronglyTypedPipelines.ToStringStep, StronglyTypedPipelines" />
<step type="StronglyTypedPipelines.DuplicateStep, StronglyTypedPipelines" />
<!-- Will error: <step type="StronglyTypedPipelines.ToFloatStep, StronglyTypedPipelines"/> -->
</pipeline>
using System;
namespace StronglyTypedPipelines
{
public class OptionalStep<INPUT, OUTPUT> : IPipelineStep<INPUT, OUTPUT> where INPUT : OUTPUT
{
private IPipelineStep<INPUT, OUTPUT> _step;
private Func<INPUT, bool> _choice;
public OptionalStep(Func<INPUT, bool> choice, IPipelineStep<INPUT, OUTPUT> step)
{
_choice = choice;
_step = step;
}
public OUTPUT Process(INPUT input)
{
if (_choice(input))
{
return _step.Process(input);
}
else
{
return input;
}
}
}
public class ChoiceStep<INPUT, OUTPUT> : IPipelineStep<INPUT, OUTPUT> where INPUT : OUTPUT
{
private IPipelineStep<INPUT, OUTPUT> _first;
private IPipelineStep<INPUT, OUTPUT> _second;
private Func<INPUT, bool> _choice;
public ChoiceStep(Func<INPUT, bool> choice, IPipelineStep<INPUT, OUTPUT> first, IPipelineStep<INPUT, OUTPUT> second)
{
_choice = choice;
_first = first;
_second = second;
}
public OUTPUT Process(INPUT input)
{
if (_choice(input))
{
return _first.Process(input);
}
else
{
return _second.Process(input);
}
}
}
}
using System;
using System.Collections.Generic;
using System.Linq.Expressions;
using System.Xml.Linq;
namespace StronglyTypedPipelines
{
class Program
{
private static void BasicPipelineTest()
{
Console.WriteLine("Basic Pipeline Test");
var input = 49;
Console.WriteLine(string.Format("Input Value: {0} [{1}]", input, input.GetType().Name));
var pipeline = new BasicPipeline();
var output = pipeline.Process(input);
Console.WriteLine(string.Format("Output Value: {0} [{1}]", output, output.GetType().Name));
Console.WriteLine();
}
private static void NestedPipelineTest()
{
Console.WriteLine("Nested Pipeline Test");
var input = 103;
Console.WriteLine(string.Format("Input Value: {0} [{1}]", input, input.GetType().Name));
var pipeline = new NestedPipeline();
var output = pipeline.Process(input);
Console.WriteLine(string.Format("Output Value: {0} [{1}]", output, output.GetType().Name));
Console.WriteLine();
}
private static void BranchingPipelineTest()
{
Console.WriteLine("Branching Pipeline Test");
foreach(int input in new int[] { 1, 10, 100, 1000 })
{
Console.WriteLine(string.Format("Input Value: {0} [{1}]", input, input.GetType().Name));
var pipeline = new BranchingPipeline();
var output = pipeline.Process(input);
Console.WriteLine(string.Format("Output Value: {0} [{1}]", output, output.GetType().Name));
}
Console.WriteLine();
}
private static void ModifiablePipelineTest()
{
Console.WriteLine("Configured Pipeline Test");
var input = 13;
Console.WriteLine(string.Format("Input Value: {0} [{1}]", input, input.GetType().Name));
XDocument xd = XDocument.Load("ConfigBasedPipeline.xml");
            //
// Patching the configuration data would go here
//
XElement pipelineXml = xd.Document.Element("pipeline");
try
{
var pipeline = new ExampleConfigBasedPipeline(pipelineXml);
var output = pipeline.Process(input);
Console.WriteLine(string.Format("Output Value: {0} [{1}]", output, output.GetType().Name));
}
catch(Exception ex)
{
Console.WriteLine("ERROR: " + ex.Message);
}
Console.WriteLine();
}
static void Main(string[] args)
{
BasicPipelineTest();
NestedPipelineTest();
BranchingPipelineTest();
ModifiablePipelineTest();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.