Skip to content

Instantly share code, notes, and snippets.

@jermdavis
Last active June 11, 2021 07:48
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jermdavis/4dcf6568d5cd44f03d0ec503620ac177 to your computer and use it in GitHub Desktop.
Save jermdavis/4dcf6568d5cd44f03d0ec503620ac177 to your computer and use it in GitHub Desktop.
Extending the code-first strongly typed pipelines to show how they might be loaded from config
using System;
namespace StronglyTypedPipelines
{
/// <summary>
/// A base interface required so that reflection code can create a Step from its type name,
/// without needing to understand its generic parameters first.
/// </summary>
public interface IPipelineStep
{
}
/// <summary>
/// Base type for individual pipeline steps.
/// Descendants of this type map an input value to an output value.
/// The input and output types can differ.
/// </summary>
public interface IPipelineStep<INPUT, OUTPUT> : IPipelineStep
{
OUTPUT Process(INPUT input);
}
/// <summary>
/// An extension method for combining PipelineSteps together into a data flow.
/// </summary>
public static class PipelineStepExtensions
{
public static OUTPUT Step<INPUT, OUTPUT>(this INPUT input, IPipelineStep<INPUT, OUTPUT> step)
{
return step.Process(input);
}
}
/// <summary>
/// The base type for a complete pipeline.
/// Descendant types can use their constructor to compile a set of PipelineSteps together
/// the PipelineStepExtensions.Step() method, and assign this to the PipelineSteps property here.
/// The initial and final types of the set of steps must match the input and output types of this class,
/// but the intermediate types can vary.
/// </summary>
public abstract class Pipeline<INPUT, OUTPUT> : IPipelineStep<INPUT, OUTPUT>
{
public Func<INPUT, OUTPUT> PipelineSteps { get; protected set; }
public OUTPUT Process(INPUT input)
{
return PipelineSteps(input);
}
}
}
namespace StronglyTypedPipelines
{
public class BasicPipeline : Pipeline<int, float>
{
public BasicPipeline()
{
PipelineSteps = input => input
.Step(new DoubleStep())
.Step(new ToStringStep())
.Step(new DuplicateStep())
.Step(new ToFloatStep());
}
}
public class InnerPipeline : Pipeline<int, int>
{
public InnerPipeline()
{
PipelineSteps = input => input
.Step(new ThirdStep())
.Step(new RoundStep());
}
}
public class NestedPipeline : Pipeline<int, string>
{
public NestedPipeline()
{
PipelineSteps = input => input
.Step(new DoubleStep())
.Step(new InnerPipeline())
.Step(new ToStringStep());
}
}
public class BranchingPipeline : Pipeline<int, string>
{
public BranchingPipeline()
{
PipelineSteps = input => input
.Step(new OptionalStep<int, int>(
f => f > 50,
new InnerPipeline()
))
.Step(new ChoiceStep<int, int>(
f => f > 100,
new HalfStep(),
new DoubleStep()
))
.Step(new ToStringStep());
}
}
}
namespace StronglyTypedPipelines
{
public class DoubleStep : IPipelineStep<int, int>
{
public int Process(int input)
{
return input * 2;
}
}
public class HalfStep : IPipelineStep<int,int>
{
public int Process(int input)
{
return input / 2;
}
}
public class ThirdStep : IPipelineStep<int, float>
{
public float Process(int input)
{
return input / 3f;
}
}
public class RoundStep : IPipelineStep<float, int>
{
public int Process(float input)
{
return (int)input;
}
}
public class ToStringStep : IPipelineStep<int, string>
{
public string Process(int input)
{
return input.ToString();
}
}
public class DuplicateStep : IPipelineStep<string, string>
{
public string Process(string input)
{
return input + "." + input;
}
}
public class ToFloatStep : IPipelineStep<string, float>
{
public float Process(string input)
{
return float.Parse(input);
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Xml.Linq;
namespace StronglyTypedPipelines
{
public abstract class ConfigBasedPipeline<INPUT, OUTPUT> : Pipeline<INPUT, OUTPUT>
{
public ConfigBasedPipeline(XElement pipelineXml)
{
if (pipelineXml == null)
{
throw new ArgumentNullException(nameof(pipelineXml));
}
var pipelineSteps = parsePipelineSteps(pipelineXml);
validatePipelineSteps(pipelineSteps);
PipelineSteps = input => processPipelineSteps(pipelineSteps, input);
}
private OUTPUT processPipelineSteps(IList<IPipelineStep> pipelineSteps, INPUT input)
{
object output = input;
foreach (IPipelineStep step in pipelineSteps)
{
MethodInfo mi = step.GetType().GetMethod("Process", BindingFlags.Public | BindingFlags.Instance);
output = mi.Invoke(step, new[] { output });
}
return (OUTPUT)output;
}
private IList<IPipelineStep> parsePipelineSteps(XElement pipelineXml)
{
var pipeline = new List<IPipelineStep>();
foreach (var xStep in pipelineXml.Elements("step"))
{
string typeName = xStep.Attribute("type").Value;
var type = Type.GetType(typeName);
var ctr = type.GetConstructor(Type.EmptyTypes);
var obj = (IPipelineStep)ctr.Invoke(Type.EmptyTypes);
pipeline.Add(obj);
}
return pipeline;
}
private void validatePipelineSteps(IList<IPipelineStep> pipelineSteps)
{
int stepNumber = 0;
Type pipelineBaseInterface = this.GetType().GetInterface("IPipelineStep`2");
Type currentInputType = pipelineBaseInterface.GenericTypeArguments[0];
Type outputType = pipelineBaseInterface.GenericTypeArguments[1];
foreach (var step in pipelineSteps)
{
stepNumber += 1;
Type stepBaseInterface = step.GetType().GetInterface("IPipelineStep`2");
Type stepInType = stepBaseInterface.GenericTypeArguments[0];
Type stepOutType = stepBaseInterface.GenericTypeArguments[1];
if (currentInputType != stepInType)
{
string msg = "Step #{0} {1} input type {2} does not match current type {3}.";
throw new InvalidOperationException(string.Format(msg, stepNumber, step.GetType().Name, stepInType.Name, currentInputType.Name));
}
currentInputType = stepOutType;
}
if (currentInputType != outputType)
{
string msg = "Final step #{0} {1} output type {2} does not equal output of pipeline {3}.";
throw new InvalidOperationException(string.Format(msg, stepNumber, pipelineSteps.Last().GetType().Name, currentInputType.Name, outputType.Name));
}
}
}
public class ExampleConfigBasedPipeline : ConfigBasedPipeline<int, string>
{
public ExampleConfigBasedPipeline(XElement pipelineXml) : base(pipelineXml)
{
}
}
}
<?xml version="1.0" encoding="utf-8" ?>
<pipeline name="example">
<step type="StronglyTypedPipelines.DoubleStep, StronglyTypedPipelines" />
<!-- Will error: <step type="StronglyTypedPipelines.ThirdStep, StronglyTypedPipelines"/> -->
<step type="StronglyTypedPipelines.ToStringStep, StronglyTypedPipelines" />
<step type="StronglyTypedPipelines.DuplicateStep, StronglyTypedPipelines" />
<!-- Will error: <step type="StronglyTypedPipelines.ToFloatStep, StronglyTypedPipelines"/> -->
</pipeline>
using System;
namespace StronglyTypedPipelines
{
public class OptionalStep<INPUT, OUTPUT> : IPipelineStep<INPUT, OUTPUT> where INPUT : OUTPUT
{
private IPipelineStep<INPUT, OUTPUT> _step;
private Func<INPUT, bool> _choice;
public OptionalStep(Func<INPUT, bool> choice, IPipelineStep<INPUT, OUTPUT> step)
{
_choice = choice;
_step = step;
}
public OUTPUT Process(INPUT input)
{
if (_choice(input))
{
return _step.Process(input);
}
else
{
return input;
}
}
}
public class ChoiceStep<INPUT, OUTPUT> : IPipelineStep<INPUT, OUTPUT> where INPUT : OUTPUT
{
private IPipelineStep<INPUT, OUTPUT> _first;
private IPipelineStep<INPUT, OUTPUT> _second;
private Func<INPUT, bool> _choice;
public ChoiceStep(Func<INPUT, bool> choice, IPipelineStep<INPUT, OUTPUT> first, IPipelineStep<INPUT, OUTPUT> second)
{
_choice = choice;
_first = first;
_second = second;
}
public OUTPUT Process(INPUT input)
{
if (_choice(input))
{
return _first.Process(input);
}
else
{
return _second.Process(input);
}
}
}
}
using System;
using System.Collections.Generic;
using System.Linq.Expressions;
using System.Xml.Linq;
namespace StronglyTypedPipelines
{
class Program
{
private static void BasicPipelineTest()
{
Console.WriteLine("Basic Pipeline Test");
var input = 49;
Console.WriteLine(string.Format("Input Value: {0} [{1}]", input, input.GetType().Name));
var pipeline = new BasicPipeline();
var output = pipeline.Process(input);
Console.WriteLine(string.Format("Output Value: {0} [{1}]", output, output.GetType().Name));
Console.WriteLine();
}
private static void NestedPipelineTest()
{
Console.WriteLine("Nested Pipeline Test");
var input = 103;
Console.WriteLine(string.Format("Input Value: {0} [{1}]", input, input.GetType().Name));
var pipeline = new NestedPipeline();
var output = pipeline.Process(input);
Console.WriteLine(string.Format("Output Value: {0} [{1}]", output, output.GetType().Name));
Console.WriteLine();
}
private static void BranchingPipelineTest()
{
Console.WriteLine("Branching Pipeline Test");
foreach(int input in new int[] { 1, 10, 100, 1000 })
{
Console.WriteLine(string.Format("Input Value: {0} [{1}]", input, input.GetType().Name));
var pipeline = new BranchingPipeline();
var output = pipeline.Process(input);
Console.WriteLine(string.Format("Output Value: {0} [{1}]", output, output.GetType().Name));
}
Console.WriteLine();
}
private static void ModifiablePipelineTest()
{
Console.WriteLine("Configured Pipeline Test");
var input = 13;
Console.WriteLine(string.Format("Input Value: {0} [{1}]", input, input.GetType().Name));
XDocument xd = XDocument.Load("ConfigBasedPipeline.xml");
            //
// Patching the configuration data would go here
//
XElement pipelineXml = xd.Document.Element("pipeline");
try
{
var pipeline = new ExampleConfigBasedPipeline(pipelineXml);
var output = pipeline.Process(input);
Console.WriteLine(string.Format("Output Value: {0} [{1}]", output, output.GetType().Name));
}
catch(Exception ex)
{
Console.WriteLine("ERROR: " + ex.Message);
}
Console.WriteLine();
}
static void Main(string[] args)
{
BasicPipelineTest();
NestedPipelineTest();
BranchingPipelineTest();
ModifiablePipelineTest();
}
}
}
@wfigueiredo
Copy link

public class BillingCreatePipelineAsync : AsyncPipeline<BillingAccountDto, BillingAccount>
  {
      private readonly CreateBillingAccountPipelineStep _step1;
      private readonly CreateFeeTicketsPipelineStep _step2;
      private readonly PublishFeeTicketsPipelineStep _step3;
      private readonly UpdateBillingAccountSentStep _step4;

      public BillingCreatePipelineAsync(
          CreateBillingAccountPipelineStep step1,
          CreateFeeTicketsPipelineStep step2,
          PublishFeeTicketsPipelineStep step3,
          UpdateBillingAccountSentStep step4)
      {
          _step1 = step1;
          _step2 = step2;
          _step3 = step3;
          _step4 = step4;

          _pipelineSteps = Input => Input
              .Step(_step1)
              .Step(_step2)
              .Step(_step3)
              .Step(_step4);
      }
  }

In this case, the execution order is being triggered like this:

Step1 --> Step3 --> Step4 --> Step2 😨😨😨

@jermdavis
Copy link
Author

Ok, so two things:

Firstly, making the AsyncPipelineStepExtensions async: I'm not sure you need that? Given the step interface still has task-in-task-out, I think you just end up awaiting the same task twice there? Once in the extension method, and once in the next step? I don't think that matters because the first await will resolve the Task's value, and the second will then effectively do nothing because the Task already has its value. But I think the code works without it? Methods with a return type of Task<T> but no async modifier just return the task - they don't do any complicated compiler business to implement asynchronous behaviour. And in this case I think that's fine?

Secondly, ordering: Are the steps really happening out-of-order, or are you seeing the effects of how the whole async thing works? The internals of what the compiler does with async code is pretty complex, but it boils down to "while an await statement is waiting for the future-value of a Task, lets try to run some other code if we can". Internally this is done with complicated state machines, that allow you to write linear-looking code, while behind the scenes the compiler generates wrappers that allow that flow of execution to work despite there being the old pattern of "BeginSomething()" / "EndSomething()" asyncronicity under the hood?

(BTW I don't consider myself too knowledgeable on how this stuff works - so it's possible I'm misunderstanding stuff here - but this is how I think it works...)

And this has turned into a bit of an epic answer - sorry ;-)

Anyway, I think we can see the outline of that behaviour in a simple example. If I create three steps which look like this:

public class HttpFetchAsyncStep : IAsyncPipelineStep<Uri, string>
{
	private static readonly HttpClient _client = new HttpClient();

	public async Task<string> ProcessAsync(Task<Uri> Input)
	{
		Console.WriteLine("Entering Step 1");
		
		var uri = await Input;
		Console.WriteLine("HaveInput Step 1");
		
		Console.WriteLine("Leaving Step 1");
		
		return await _client.GetStringAsync(uri);
	}
}

The first step does an HTTP fetch for whatever URL comes in, and it outputs some text at three key points: On entry, after the input await and before it returns.

public class ModifyTextAsyncStep : IAsyncPipelineStep<string, string>
{
	public async Task<string> ProcessAsync(Task<string> Input)
	{
		Console.WriteLine("Entering Step 2");
		
		var txt = await Input;
		Console.WriteLine("HaveInput Step 2");
		
		var output = txt.Replace("BBC", "Not the BBC");
		
		Console.WriteLine("Leaving Step 2");
		
		return output;
	}
}

The second step does some text replacement, and outputs similar messages.

public class DiskWriteAsyncStep : IAsyncPipelineStep<string, string>
{
	public async Task<string> ProcessAsync(Task<string> Input)
	{
		Console.WriteLine("Entering Step 3");
		
		var data = await Input;
		Console.WriteLine("HaveInput Step 3");
		
		var fileName = @".\test.txt";

		await System.IO.File.WriteAllTextAsync(fileName, data);

		Console.WriteLine("Leaving Step 3");

		return fileName;
	}
}

And then step three writes the changed data to disk, and does another set of WriteLines in the same pattern.

I don't think the particular bits of code are important - I just wanted something simple-ish to demonstrate the behaviour. I suspect you'd see something similar in your code...

Now, if I make a pipeline from those:

public class ExampleAsyncPipeline : AsyncPipeline<Uri, string>
{
	public ExampleAsyncPipeline()
	{
		_pipelineSteps = input => input
			.Step(new HttpFetchAsyncStep())
			.Step(new ModifyTextAsyncStep())
			.Step(new DiskWriteAsyncStep());

	}
}

It will run a pipeline with the steps in that order, but the output looks like:

Entering Step 1
HaveInput Step 1
Leaving Step 1
Entering Step 2
Entering Step 3
HaveInput Step 2
Leaving Step 2
HaveInput Step 3
Leaving Step 3

So you can see the steps don't happen completely linearly:

  • It calls step one, and that all looks like it happens in order, because its input is an "already completed" task, and it starts the async HTTP fetch, and returns the promise of that response value in the future.
  • The second step starts, and it awaits that input variable. The HTTP fetch hasn't completed here, so this work gets shunted off into the background, and the runtime looks for other work it can do. Interestingly, (confusingly?) that other work is to start step three by passing in a task for "you'll get your data eventually". So it looks like step three starts before step two completes. And step three also gets stuck awaiting its input task. At this point the runtime has to look elsewhere for work to do. (If there is any - it may just block)
  • Then the await for step two's input task completes and provides its value. That allows step two to complete the rest of its processing and return its output.
  • And that allows step three's input await to complete and hence it can do the rest of its processing.

So I think that's why you see aparently out-of-order execution? The runtime behind the async / await calls is making sure that where necessary, the flow of your program is preserved, but it trys to fill the time during the "waiting for a future value" bits of the long-running operations with other bits of work where it can?

Now thinking about it, I suspect this allows for some odd behaviour in the code here if anything "important" happens before awaiting the input in an individual step. (The delay between that code, and the await completing could lead to deadlocks or other strangeness in some code) That makes me want to abstract that await out of the step itself and hide it away in some base code. but that's trickier to achieve - as it needs more change. Quick attempt...

First, the pipeline definition needs to allow explicitly for both Task<TIn> and TIn:

public interface IAsyncPipelineStep<TIn, TOut>
{
	Task<TOut> ProcessAsync(Task<TIn> Input);
	Task<TOut> ProcessAsync(TIn Input);
}

We'd already added that signature to AsyncPipeline<TIn, TOut> - so that doesn't change. And we can leave AsyncPipelineStepExtensions too, as it already has all its required methods.

To hide that input await we can have an abstract base for each pipeline step:

public abstract class BaseAsyncStep<TIn, TOut> : IAsyncPipelineStep<TIn, TOut>
{
	public async Task<TOut> ProcessAsync(Task<TIn> Input)
	{
		var input = await Input;
		return await ProcessAsync(input);
	}

	public abstract Task<TOut> ProcessAsync(TIn Input);
}

Than handles the Task<TIn> method by awaiting the input. And it delegaes the TIn method to the concrete class. So we can tweak the individual step definitions from my example above:

public class HttpFetchAsyncStep : BaseAsyncStep<Uri, string>
{
	private static readonly HttpClient _client = new HttpClient();

	public override async Task<string> ProcessAsync(Uri Input)
	{
		Console.WriteLine("Entering Step 1");

		Console.WriteLine("Leaving Step 1");

		return await _client.GetStringAsync(Input);
	}
}

public class ModifyTextAsyncStep : BaseAsyncStep<string, string>
{
	public override async Task<string> ProcessAsync(string Input)
	{
		Console.WriteLine("Entering Step 2");

		var output = Input.Replace("BBC", "CCB");

		Console.WriteLine("Leaving Step 2");

		return output;
	}
}

public class DiskWriteAsyncStep : BaseAsyncStep<string, string>
{
	public override async Task<string> ProcessAsync(string Input)
	{
		Console.WriteLine("Entering Step 3");

		var fileName = @"C:\Users\jeremy.davis\Desktop\test.txt";

		await System.IO.File.WriteAllTextAsync(fileName, Input);

		Console.WriteLine("Leaving Step 3");

		return fileName;
	}
}

They override that abstract TIn method bnow, and never have to await their input.

If I run that, the output becomes

Entering Step 1
Leaving Step 1
Entering Step 2
Leaving Step 2
Entering Step 3
Leaving Step 3

Which makes it look all nice and synchronous. But actually all we did was hide the ordering oddity from above. If I add some extra debug data to the abstract step type:

public abstract class BaseAsyncStep<TIn, TOut> : IAsyncPipelineStep<TIn, TOut>
{
	public async Task<TOut> ProcessAsync(Task<TIn> Input)
	{
		Console.WriteLine($"Awaiting input for {this.GetType().Name}");
		
		var input = await Input;
		
		Console.WriteLine($"Got input for {this.GetType().Name}");
		
		return await ProcessAsync(input);
	}

	public abstract Task<TOut> ProcessAsync(TIn Input);
}

Then you can see that the ordering still has the whole "await will wander off and do other stuff" behaviour:

Awaiting input for HttpFetchAsyncStep
Got input for HttpFetchAsyncStep
Entering Step 1
Leaving Step 1
Awaiting input for ModifyTextAsyncStep
Awaiting input for DiskWriteAsyncStep
Got input for ModifyTextAsyncStep
Entering Step 2
Leaving Step 2
Got input for DiskWriteAsyncStep
Entering Step 3
Leaving Step 3

Difference here being that it's harder for someone writing a concrete step to accidentally mess up because of that...

Does that make sense?

@jermdavis
Copy link
Author

Actually - having thought about this harder, I think there's a better solution that makes all this simpler. We can move all the logic for handling a Task<TIn> into the extension method class, and simplify everything else...

Instead of the second batch of code from the previous comment...

public interface IAsyncPipelineStep<TIn, TOut>
{
	Task<TOut> ProcessAsync(TIn Input);
}

public abstract class AsyncPipeline<TIn, TOut> : IAsyncPipelineStep<TIn, TOut>
{
	public Func<TIn, Task<TOut>> _pipelineSteps { get; protected set; }

	public Task<TOut> ProcessAsync(TIn Input)
	{
		return _pipelineSteps(Input);
	}
}

public static class AsyncPipelineStepExtensions
{
	public async static Task<TOut> Step<TIn, TOut>(this Task<TIn> Input, IAsyncPipelineStep<TIn, TOut> Step)
	{
		Console.WriteLine($"Awaiting input for {Step.GetType().Name}");
		var input = await Input;
		Console.WriteLine($"Got input for {Step.GetType().Name}");
		
		return await Step.ProcessAsync(input);
	}

	public async static Task<TOut> Step<TIn, TOut>(this TIn Input, IAsyncPipelineStep<TIn, TOut> Step)
	{
		Console.WriteLine($"Got input for {Step.GetType().Name}");
		return await Step.ProcessAsync(Input);
	}
}

and the individual example steps end up looking like:

public class HttpFetchAsyncStep : IAsyncPipelineStep<Uri, string>
{
	private static readonly HttpClient _client = new HttpClient();

	public async Task<string> ProcessAsync(Uri Input)
	{
		Console.WriteLine("Entering HttpFetchAsyncStep #1");
		
		Console.WriteLine("Leaving HttpFetchAsyncStep #1");
		
		return await _client.GetStringAsync(Input);
	}
}

public class ModifyTextAsyncStep : IAsyncPipelineStep<string, string>
{
	public async Task<string> ProcessAsync(string Input)
	{
		Console.WriteLine("Entering ModifyTextAsyncStep #2");

		var output = Input.Replace("BBC", "Not the BBC");

		Console.WriteLine("Leaving ModifyTextAsyncStep #2");

		return output;
	}
}

public class DiskWriteAsyncStep : IAsyncPipelineStep<string, string>
{
	public async Task<string> ProcessAsync(string Input)
	{
		Console.WriteLine("Entering DiskWriteAsyncStep #3");
		
		var desktopFolder = System.Environment.GetFolderPath(Environment.SpecialFolder.Desktop);
		var fileName =  System.IO.Path.Combine(desktopFolder, "test.txt");

		await System.IO.File.WriteAllTextAsync(fileName, Input);

		Console.WriteLine("Leaving DiskWriteAsyncStep #3");

		return fileName;
	}
}

That gives the same overall behaviour, but doesn't require an abstract base for pipeline steps, and doesn't suffer from the "putting code before the await for the input" issue I mentioned before.

@wfigueiredo
Copy link

Hi Jeremy!
Sorry for the delay... I've not been feeling well the last few days (not Covid, though)

Yeah, you nailed it: it was not the order, but the async behaviour instead (wow, I'm really confused by all of this, lol...)
As I said, I'm new to C#/.NET so thanks again for your patience.

I did include your latest suggestion for AsyncPipelineStepExtensions and now works beautiful!
I think you really should include this Async approach to your gists... It helped me a lot, and can be of use for other devs too!
Amazing job, I really appreciate it! =D

gonna keep testing new scenarios, but I think most of the infrastructure is now solid enough to go on.
Thanks!!!! =)

@jermdavis
Copy link
Author

Yeah totally agree the behaviour of async can be confusing. There are good books about this stuff though, if you want to learn a bit more - "C# In Depth" by Jon Skeet covers useful detail about the hows & whys of async. And it also covers lots of other helpful stuff for people new to the C# language overall. Maybe work will let you put a copy on expenses for learning?

Glad you're getting on top of all this though - happy have been able to help. Certainly not going to waste all this interesting material - what I've worked out will end up on my blog and I'll add new gists for the changed code as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment