Skip to content

Instantly share code, notes, and snippets.

@jermdavis
Last active December 20, 2021 16:08
Show Gist options
  • Save jermdavis/67cf5d834ad720bb5f57c1a556c6b3d7 to your computer and use it in GitHub Desktop.
Save jermdavis/67cf5d834ad720bb5f57c1a556c6b3d7 to your computer and use it in GitHub Desktop.
An example of strongly typed data pipelines, with some trivial examples
using System;
namespace StronglyTypedPipelines
{
/// <summary>
/// Base type for individual pipeline steps.
/// Descendants of this type map an input value to an output value.
/// The input and output types can differ.
/// </summary>
public interface IPipelineStep<INPUT, OUTPUT>
{
OUTPUT Process(INPUT input);
}
/// <summary>
/// An extension method for combining PipelineSteps together into a data flow.
/// </summary>
public static class PipelineStepExtensions
{
public static OUTPUT Step<INPUT, OUTPUT>(this INPUT input, IPipelineStep<INPUT, OUTPUT> step)
{
return step.Process(input);
}
}
/// <summary>
/// The base type for a complete pipeline.
/// Descendant types can use their constructor to compile a set of PipelineSteps together
/// the PipelineStepExtensions.Step() method, and assign this to the PipelineSteps property here.
/// The initial and final types of the set of steps must match the input and output types of this class,
/// but the intermediate types can vary.
/// </summary>
public abstract class Pipeline<INPUT, OUTPUT> : IPipelineStep<INPUT, OUTPUT>
{
public Func<INPUT, OUTPUT> PipelineSteps { get; protected set; }
public OUTPUT Process(INPUT input)
{
return PipelineSteps(input);
}
}
}
namespace StronglyTypedPipelines
{
public class BasicPipeline : Pipeline<int, float>
{
public BasicPipeline()
{
PipelineSteps = input => input
.Step(new DoubleStep())
.Step(new ToStringStep())
.Step(new DuplicateStep())
.Step(new ToFloatStep());
}
}
public class InnerPipeline : Pipeline<int, int>
{
public InnerPipeline()
{
PipelineSteps = input => input
.Step(new ThirdStep())
.Step(new RoundStep());
}
}
public class NestedPipeline : Pipeline<int, string>
{
public NestedPipeline()
{
PipelineSteps = input => input
.Step(new DoubleStep())
.Step(new InnerPipeline())
.Step(new ToStringStep());
}
}
public class BranchingPipeline : Pipeline<int, string>
{
public BranchingPipeline()
{
PipelineSteps = input => input
.Step(new OptionalStep<int, int>(
f => f > 50,
new InnerPipeline()
))
.Step(new ChoiceStep<int, int>(
f => f > 100,
new HalfStep(),
new DoubleStep()
))
.Step(new ToStringStep());
}
}
}
namespace StronglyTypedPipelines
{
public class DoubleStep : IPipelineStep<int, int>
{
public int Process(int input)
{
return input * 2;
}
}
public class HalfStep : IPipelineStep<int,int>
{
public int Process(int input)
{
return input / 2;
}
}
public class ThirdStep : IPipelineStep<int, float>
{
public float Process(int input)
{
return input / 3f;
}
}
public class RoundStep : IPipelineStep<float, int>
{
public int Process(float input)
{
return (int)input;
}
}
public class ToStringStep : IPipelineStep<int, string>
{
public string Process(int input)
{
return input.ToString();
}
}
public class DuplicateStep : IPipelineStep<string, string>
{
public string Process(string input)
{
return input + "." + input;
}
}
public class ToFloatStep : IPipelineStep<string, float>
{
public float Process(string input)
{
return float.Parse(input);
}
}
}
using System;
namespace StronglyTypedPipelines
{
public class OptionalStep<INPUT, OUTPUT> : IPipelineStep<INPUT, OUTPUT> where INPUT : OUTPUT
{
private IPipelineStep<INPUT, OUTPUT> _step;
private Func<INPUT, bool> _choice;
public OptionalStep(Func<INPUT, bool> choice, IPipelineStep<INPUT, OUTPUT> step)
{
_choice = choice;
_step = step;
}
public OUTPUT Process(INPUT input)
{
if (_choice(input))
{
return _step.Process(input);
}
else
{
return input;
}
}
}
public class ChoiceStep<INPUT, OUTPUT> : IPipelineStep<INPUT, OUTPUT> where INPUT : OUTPUT
{
private IPipelineStep<INPUT, OUTPUT> _first;
private IPipelineStep<INPUT, OUTPUT> _second;
private Func<INPUT, bool> _choice;
public ChoiceStep(Func<INPUT, bool> choice, IPipelineStep<INPUT, OUTPUT> first, IPipelineStep<INPUT, OUTPUT> second)
{
_choice = choice;
_first = first;
_second = second;
}
public OUTPUT Process(INPUT input)
{
if (_choice(input))
{
return _first.Process(input);
}
else
{
return _second.Process(input);
}
}
}
}
using System;
namespace StronglyTypedPipelines
{
class Program
{
private static void BasicPipelineTest()
{
Console.WriteLine("Basic Pipeline Test");
var input = 49;
Console.WriteLine(string.Format("Input Value: {0} [{1}]", input, input.GetType().Name));
var pipeline = new BasicPipeline();
var output = pipeline.Process(input);
Console.WriteLine(string.Format("Output Value: {0} [{1}]", output, output.GetType().Name));
Console.WriteLine();
}
private static void NestedPipelineTest()
{
Console.WriteLine("Nested Pipeline Test");
var input = 103;
Console.WriteLine(string.Format("Input Value: {0} [{1}]", input, input.GetType().Name));
var pipeline = new NestedPipeline();
var output = pipeline.Process(input);
Console.WriteLine(string.Format("Output Value: {0} [{1}]", output, output.GetType().Name));
Console.WriteLine();
}
private static void BranchingPipelineTest()
{
Console.WriteLine("Branching Pipeline Test");
foreach(int input in new int[] { 1, 10, 100, 1000 })
{
Console.WriteLine(string.Format("Input Value: {0} [{1}]", input, input.GetType().Name));
var pipeline = new BranchingPipeline();
var output = pipeline.Process(input);
Console.WriteLine(string.Format("Output Value: {0} [{1}]", output, output.GetType().Name));
}
Console.WriteLine();
}
static void Main(string[] args)
{
BasicPipelineTest();
NestedPipelineTest();
BranchingPipelineTest();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment