Skip to content

Instantly share code, notes, and snippets.

@jermdavis
Last active June 11, 2021 07:48
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jermdavis/4dcf6568d5cd44f03d0ec503620ac177 to your computer and use it in GitHub Desktop.
Save jermdavis/4dcf6568d5cd44f03d0ec503620ac177 to your computer and use it in GitHub Desktop.
Extending the code-first strongly typed pipelines to show how they might be loaded from config
using System;
namespace StronglyTypedPipelines
{
/// <summary>
/// A base interface required so that reflection code can create a Step from its type name,
/// without needing to understand its generic parameters first.
/// </summary>
public interface IPipelineStep
{
}
/// <summary>
/// Base type for individual pipeline steps.
/// Descendants of this type map an input value to an output value.
/// The input and output types can differ.
/// </summary>
public interface IPipelineStep<INPUT, OUTPUT> : IPipelineStep
{
OUTPUT Process(INPUT input);
}
/// <summary>
/// An extension method for combining PipelineSteps together into a data flow.
/// </summary>
public static class PipelineStepExtensions
{
public static OUTPUT Step<INPUT, OUTPUT>(this INPUT input, IPipelineStep<INPUT, OUTPUT> step)
{
return step.Process(input);
}
}
/// <summary>
/// The base type for a complete pipeline.
/// Descendant types can use their constructor to compile a set of PipelineSteps together
/// the PipelineStepExtensions.Step() method, and assign this to the PipelineSteps property here.
/// The initial and final types of the set of steps must match the input and output types of this class,
/// but the intermediate types can vary.
/// </summary>
public abstract class Pipeline<INPUT, OUTPUT> : IPipelineStep<INPUT, OUTPUT>
{
public Func<INPUT, OUTPUT> PipelineSteps { get; protected set; }
public OUTPUT Process(INPUT input)
{
return PipelineSteps(input);
}
}
}
namespace StronglyTypedPipelines
{
public class BasicPipeline : Pipeline<int, float>
{
public BasicPipeline()
{
PipelineSteps = input => input
.Step(new DoubleStep())
.Step(new ToStringStep())
.Step(new DuplicateStep())
.Step(new ToFloatStep());
}
}
public class InnerPipeline : Pipeline<int, int>
{
public InnerPipeline()
{
PipelineSteps = input => input
.Step(new ThirdStep())
.Step(new RoundStep());
}
}
public class NestedPipeline : Pipeline<int, string>
{
public NestedPipeline()
{
PipelineSteps = input => input
.Step(new DoubleStep())
.Step(new InnerPipeline())
.Step(new ToStringStep());
}
}
public class BranchingPipeline : Pipeline<int, string>
{
public BranchingPipeline()
{
PipelineSteps = input => input
.Step(new OptionalStep<int, int>(
f => f > 50,
new InnerPipeline()
))
.Step(new ChoiceStep<int, int>(
f => f > 100,
new HalfStep(),
new DoubleStep()
))
.Step(new ToStringStep());
}
}
}
namespace StronglyTypedPipelines
{
public class DoubleStep : IPipelineStep<int, int>
{
public int Process(int input)
{
return input * 2;
}
}
public class HalfStep : IPipelineStep<int,int>
{
public int Process(int input)
{
return input / 2;
}
}
public class ThirdStep : IPipelineStep<int, float>
{
public float Process(int input)
{
return input / 3f;
}
}
public class RoundStep : IPipelineStep<float, int>
{
public int Process(float input)
{
return (int)input;
}
}
public class ToStringStep : IPipelineStep<int, string>
{
public string Process(int input)
{
return input.ToString();
}
}
public class DuplicateStep : IPipelineStep<string, string>
{
public string Process(string input)
{
return input + "." + input;
}
}
public class ToFloatStep : IPipelineStep<string, float>
{
public float Process(string input)
{
return float.Parse(input);
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Xml.Linq;
namespace StronglyTypedPipelines
{
public abstract class ConfigBasedPipeline<INPUT, OUTPUT> : Pipeline<INPUT, OUTPUT>
{
public ConfigBasedPipeline(XElement pipelineXml)
{
if (pipelineXml == null)
{
throw new ArgumentNullException(nameof(pipelineXml));
}
var pipelineSteps = parsePipelineSteps(pipelineXml);
validatePipelineSteps(pipelineSteps);
PipelineSteps = input => processPipelineSteps(pipelineSteps, input);
}
private OUTPUT processPipelineSteps(IList<IPipelineStep> pipelineSteps, INPUT input)
{
object output = input;
foreach (IPipelineStep step in pipelineSteps)
{
MethodInfo mi = step.GetType().GetMethod("Process", BindingFlags.Public | BindingFlags.Instance);
output = mi.Invoke(step, new[] { output });
}
return (OUTPUT)output;
}
private IList<IPipelineStep> parsePipelineSteps(XElement pipelineXml)
{
var pipeline = new List<IPipelineStep>();
foreach (var xStep in pipelineXml.Elements("step"))
{
string typeName = xStep.Attribute("type").Value;
var type = Type.GetType(typeName);
var ctr = type.GetConstructor(Type.EmptyTypes);
var obj = (IPipelineStep)ctr.Invoke(Type.EmptyTypes);
pipeline.Add(obj);
}
return pipeline;
}
private void validatePipelineSteps(IList<IPipelineStep> pipelineSteps)
{
int stepNumber = 0;
Type pipelineBaseInterface = this.GetType().GetInterface("IPipelineStep`2");
Type currentInputType = pipelineBaseInterface.GenericTypeArguments[0];
Type outputType = pipelineBaseInterface.GenericTypeArguments[1];
foreach (var step in pipelineSteps)
{
stepNumber += 1;
Type stepBaseInterface = step.GetType().GetInterface("IPipelineStep`2");
Type stepInType = stepBaseInterface.GenericTypeArguments[0];
Type stepOutType = stepBaseInterface.GenericTypeArguments[1];
if (currentInputType != stepInType)
{
string msg = "Step #{0} {1} input type {2} does not match current type {3}.";
throw new InvalidOperationException(string.Format(msg, stepNumber, step.GetType().Name, stepInType.Name, currentInputType.Name));
}
currentInputType = stepOutType;
}
if (currentInputType != outputType)
{
string msg = "Final step #{0} {1} output type {2} does not equal output of pipeline {3}.";
throw new InvalidOperationException(string.Format(msg, stepNumber, pipelineSteps.Last().GetType().Name, currentInputType.Name, outputType.Name));
}
}
}
public class ExampleConfigBasedPipeline : ConfigBasedPipeline<int, string>
{
public ExampleConfigBasedPipeline(XElement pipelineXml) : base(pipelineXml)
{
}
}
}
<?xml version="1.0" encoding="utf-8" ?>
<pipeline name="example">
<step type="StronglyTypedPipelines.DoubleStep, StronglyTypedPipelines" />
<!-- Will error: <step type="StronglyTypedPipelines.ThirdStep, StronglyTypedPipelines"/> -->
<step type="StronglyTypedPipelines.ToStringStep, StronglyTypedPipelines" />
<step type="StronglyTypedPipelines.DuplicateStep, StronglyTypedPipelines" />
<!-- Will error: <step type="StronglyTypedPipelines.ToFloatStep, StronglyTypedPipelines"/> -->
</pipeline>
using System;
namespace StronglyTypedPipelines
{
public class OptionalStep<INPUT, OUTPUT> : IPipelineStep<INPUT, OUTPUT> where INPUT : OUTPUT
{
private IPipelineStep<INPUT, OUTPUT> _step;
private Func<INPUT, bool> _choice;
public OptionalStep(Func<INPUT, bool> choice, IPipelineStep<INPUT, OUTPUT> step)
{
_choice = choice;
_step = step;
}
public OUTPUT Process(INPUT input)
{
if (_choice(input))
{
return _step.Process(input);
}
else
{
return input;
}
}
}
public class ChoiceStep<INPUT, OUTPUT> : IPipelineStep<INPUT, OUTPUT> where INPUT : OUTPUT
{
private IPipelineStep<INPUT, OUTPUT> _first;
private IPipelineStep<INPUT, OUTPUT> _second;
private Func<INPUT, bool> _choice;
public ChoiceStep(Func<INPUT, bool> choice, IPipelineStep<INPUT, OUTPUT> first, IPipelineStep<INPUT, OUTPUT> second)
{
_choice = choice;
_first = first;
_second = second;
}
public OUTPUT Process(INPUT input)
{
if (_choice(input))
{
return _first.Process(input);
}
else
{
return _second.Process(input);
}
}
}
}
using System;
using System.Collections.Generic;
using System.Linq.Expressions;
using System.Xml.Linq;
namespace StronglyTypedPipelines
{
class Program
{
private static void BasicPipelineTest()
{
Console.WriteLine("Basic Pipeline Test");
var input = 49;
Console.WriteLine(string.Format("Input Value: {0} [{1}]", input, input.GetType().Name));
var pipeline = new BasicPipeline();
var output = pipeline.Process(input);
Console.WriteLine(string.Format("Output Value: {0} [{1}]", output, output.GetType().Name));
Console.WriteLine();
}
private static void NestedPipelineTest()
{
Console.WriteLine("Nested Pipeline Test");
var input = 103;
Console.WriteLine(string.Format("Input Value: {0} [{1}]", input, input.GetType().Name));
var pipeline = new NestedPipeline();
var output = pipeline.Process(input);
Console.WriteLine(string.Format("Output Value: {0} [{1}]", output, output.GetType().Name));
Console.WriteLine();
}
private static void BranchingPipelineTest()
{
Console.WriteLine("Branching Pipeline Test");
foreach(int input in new int[] { 1, 10, 100, 1000 })
{
Console.WriteLine(string.Format("Input Value: {0} [{1}]", input, input.GetType().Name));
var pipeline = new BranchingPipeline();
var output = pipeline.Process(input);
Console.WriteLine(string.Format("Output Value: {0} [{1}]", output, output.GetType().Name));
}
Console.WriteLine();
}
private static void ModifiablePipelineTest()
{
Console.WriteLine("Configured Pipeline Test");
var input = 13;
Console.WriteLine(string.Format("Input Value: {0} [{1}]", input, input.GetType().Name));
XDocument xd = XDocument.Load("ConfigBasedPipeline.xml");
            //
// Patching the configuration data would go here
//
XElement pipelineXml = xd.Document.Element("pipeline");
try
{
var pipeline = new ExampleConfigBasedPipeline(pipelineXml);
var output = pipeline.Process(input);
Console.WriteLine(string.Format("Output Value: {0} [{1}]", output, output.GetType().Name));
}
catch(Exception ex)
{
Console.WriteLine("ERROR: " + ex.Message);
}
Console.WriteLine();
}
static void Main(string[] args)
{
BasicPipelineTest();
NestedPipelineTest();
BranchingPipelineTest();
ModifiablePipelineTest();
}
}
}
@wfigueiredo
Copy link

Hi Jeremy!
Sorry for the delay... I've not been feeling well the last few days (not Covid, though)

Yeah, you nailed it: it was not the order, but the async behaviour instead (wow, I'm really confused by all of this, lol...)
As I said, I'm new to C#/.NET so thanks again for your patience.

I did include your latest suggestion for AsyncPipelineStepExtensions and now works beautiful!
I think you really should include this Async approach to your gists... It helped me a lot, and can be of use for other devs too!
Amazing job, I really appreciate it! =D

gonna keep testing new scenarios, but I think most of the infrastructure is now solid enough to go on.
Thanks!!!! =)

@jermdavis
Copy link
Author

Yeah totally agree the behaviour of async can be confusing. There are good books about this stuff though, if you want to learn a bit more - "C# In Depth" by Jon Skeet covers useful detail about the hows & whys of async. And it also covers lots of other helpful stuff for people new to the C# language overall. Maybe work will let you put a copy on expenses for learning?

Glad you're getting on top of all this though - happy have been able to help. Certainly not going to waste all this interesting material - what I've worked out will end up on my blog and I'll add new gists for the changed code as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment