Skip to content

Instantly share code, notes, and snippets.

@jermdavis
Last active June 11, 2021 07:48
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jermdavis/4dcf6568d5cd44f03d0ec503620ac177 to your computer and use it in GitHub Desktop.
Save jermdavis/4dcf6568d5cd44f03d0ec503620ac177 to your computer and use it in GitHub Desktop.
Extending the code-first strongly typed pipelines to show how they might be loaded from config
using System;
namespace StronglyTypedPipelines
{
/// <summary>
/// A base interface required so that reflection code can create a Step from its type name,
/// without needing to understand its generic parameters first.
/// </summary>
public interface IPipelineStep
{
}
/// <summary>
/// Base type for individual pipeline steps.
/// Descendants of this type map an input value to an output value.
/// The input and output types can differ.
/// </summary>
public interface IPipelineStep<INPUT, OUTPUT> : IPipelineStep
{
OUTPUT Process(INPUT input);
}
/// <summary>
/// An extension method for combining PipelineSteps together into a data flow.
/// </summary>
public static class PipelineStepExtensions
{
public static OUTPUT Step<INPUT, OUTPUT>(this INPUT input, IPipelineStep<INPUT, OUTPUT> step)
{
return step.Process(input);
}
}
/// <summary>
/// The base type for a complete pipeline.
/// Descendant types can use their constructor to compile a set of PipelineSteps together
/// the PipelineStepExtensions.Step() method, and assign this to the PipelineSteps property here.
/// The initial and final types of the set of steps must match the input and output types of this class,
/// but the intermediate types can vary.
/// </summary>
public abstract class Pipeline<INPUT, OUTPUT> : IPipelineStep<INPUT, OUTPUT>
{
public Func<INPUT, OUTPUT> PipelineSteps { get; protected set; }
public OUTPUT Process(INPUT input)
{
return PipelineSteps(input);
}
}
}
namespace StronglyTypedPipelines
{
public class BasicPipeline : Pipeline<int, float>
{
public BasicPipeline()
{
PipelineSteps = input => input
.Step(new DoubleStep())
.Step(new ToStringStep())
.Step(new DuplicateStep())
.Step(new ToFloatStep());
}
}
public class InnerPipeline : Pipeline<int, int>
{
public InnerPipeline()
{
PipelineSteps = input => input
.Step(new ThirdStep())
.Step(new RoundStep());
}
}
public class NestedPipeline : Pipeline<int, string>
{
public NestedPipeline()
{
PipelineSteps = input => input
.Step(new DoubleStep())
.Step(new InnerPipeline())
.Step(new ToStringStep());
}
}
public class BranchingPipeline : Pipeline<int, string>
{
public BranchingPipeline()
{
PipelineSteps = input => input
.Step(new OptionalStep<int, int>(
f => f > 50,
new InnerPipeline()
))
.Step(new ChoiceStep<int, int>(
f => f > 100,
new HalfStep(),
new DoubleStep()
))
.Step(new ToStringStep());
}
}
}
namespace StronglyTypedPipelines
{
public class DoubleStep : IPipelineStep<int, int>
{
public int Process(int input)
{
return input * 2;
}
}
public class HalfStep : IPipelineStep<int,int>
{
public int Process(int input)
{
return input / 2;
}
}
public class ThirdStep : IPipelineStep<int, float>
{
public float Process(int input)
{
return input / 3f;
}
}
public class RoundStep : IPipelineStep<float, int>
{
public int Process(float input)
{
return (int)input;
}
}
public class ToStringStep : IPipelineStep<int, string>
{
public string Process(int input)
{
return input.ToString();
}
}
public class DuplicateStep : IPipelineStep<string, string>
{
public string Process(string input)
{
return input + "." + input;
}
}
public class ToFloatStep : IPipelineStep<string, float>
{
public float Process(string input)
{
return float.Parse(input);
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Xml.Linq;
namespace StronglyTypedPipelines
{
public abstract class ConfigBasedPipeline<INPUT, OUTPUT> : Pipeline<INPUT, OUTPUT>
{
public ConfigBasedPipeline(XElement pipelineXml)
{
if (pipelineXml == null)
{
throw new ArgumentNullException(nameof(pipelineXml));
}
var pipelineSteps = parsePipelineSteps(pipelineXml);
validatePipelineSteps(pipelineSteps);
PipelineSteps = input => processPipelineSteps(pipelineSteps, input);
}
private OUTPUT processPipelineSteps(IList<IPipelineStep> pipelineSteps, INPUT input)
{
object output = input;
foreach (IPipelineStep step in pipelineSteps)
{
MethodInfo mi = step.GetType().GetMethod("Process", BindingFlags.Public | BindingFlags.Instance);
output = mi.Invoke(step, new[] { output });
}
return (OUTPUT)output;
}
private IList<IPipelineStep> parsePipelineSteps(XElement pipelineXml)
{
var pipeline = new List<IPipelineStep>();
foreach (var xStep in pipelineXml.Elements("step"))
{
string typeName = xStep.Attribute("type").Value;
var type = Type.GetType(typeName);
var ctr = type.GetConstructor(Type.EmptyTypes);
var obj = (IPipelineStep)ctr.Invoke(Type.EmptyTypes);
pipeline.Add(obj);
}
return pipeline;
}
private void validatePipelineSteps(IList<IPipelineStep> pipelineSteps)
{
int stepNumber = 0;
Type pipelineBaseInterface = this.GetType().GetInterface("IPipelineStep`2");
Type currentInputType = pipelineBaseInterface.GenericTypeArguments[0];
Type outputType = pipelineBaseInterface.GenericTypeArguments[1];
foreach (var step in pipelineSteps)
{
stepNumber += 1;
Type stepBaseInterface = step.GetType().GetInterface("IPipelineStep`2");
Type stepInType = stepBaseInterface.GenericTypeArguments[0];
Type stepOutType = stepBaseInterface.GenericTypeArguments[1];
if (currentInputType != stepInType)
{
string msg = "Step #{0} {1} input type {2} does not match current type {3}.";
throw new InvalidOperationException(string.Format(msg, stepNumber, step.GetType().Name, stepInType.Name, currentInputType.Name));
}
currentInputType = stepOutType;
}
if (currentInputType != outputType)
{
string msg = "Final step #{0} {1} output type {2} does not equal output of pipeline {3}.";
throw new InvalidOperationException(string.Format(msg, stepNumber, pipelineSteps.Last().GetType().Name, currentInputType.Name, outputType.Name));
}
}
}
public class ExampleConfigBasedPipeline : ConfigBasedPipeline<int, string>
{
public ExampleConfigBasedPipeline(XElement pipelineXml) : base(pipelineXml)
{
}
}
}
<?xml version="1.0" encoding="utf-8" ?>
<pipeline name="example">
<step type="StronglyTypedPipelines.DoubleStep, StronglyTypedPipelines" />
<!-- Will error: <step type="StronglyTypedPipelines.ThirdStep, StronglyTypedPipelines"/> -->
<step type="StronglyTypedPipelines.ToStringStep, StronglyTypedPipelines" />
<step type="StronglyTypedPipelines.DuplicateStep, StronglyTypedPipelines" />
<!-- Will error: <step type="StronglyTypedPipelines.ToFloatStep, StronglyTypedPipelines"/> -->
</pipeline>
using System;
namespace StronglyTypedPipelines
{
public class OptionalStep<INPUT, OUTPUT> : IPipelineStep<INPUT, OUTPUT> where INPUT : OUTPUT
{
private IPipelineStep<INPUT, OUTPUT> _step;
private Func<INPUT, bool> _choice;
public OptionalStep(Func<INPUT, bool> choice, IPipelineStep<INPUT, OUTPUT> step)
{
_choice = choice;
_step = step;
}
public OUTPUT Process(INPUT input)
{
if (_choice(input))
{
return _step.Process(input);
}
else
{
return input;
}
}
}
public class ChoiceStep<INPUT, OUTPUT> : IPipelineStep<INPUT, OUTPUT> where INPUT : OUTPUT
{
private IPipelineStep<INPUT, OUTPUT> _first;
private IPipelineStep<INPUT, OUTPUT> _second;
private Func<INPUT, bool> _choice;
public ChoiceStep(Func<INPUT, bool> choice, IPipelineStep<INPUT, OUTPUT> first, IPipelineStep<INPUT, OUTPUT> second)
{
_choice = choice;
_first = first;
_second = second;
}
public OUTPUT Process(INPUT input)
{
if (_choice(input))
{
return _first.Process(input);
}
else
{
return _second.Process(input);
}
}
}
}
using System;
using System.Collections.Generic;
using System.Linq.Expressions;
using System.Xml.Linq;
namespace StronglyTypedPipelines
{
class Program
{
private static void BasicPipelineTest()
{
Console.WriteLine("Basic Pipeline Test");
var input = 49;
Console.WriteLine(string.Format("Input Value: {0} [{1}]", input, input.GetType().Name));
var pipeline = new BasicPipeline();
var output = pipeline.Process(input);
Console.WriteLine(string.Format("Output Value: {0} [{1}]", output, output.GetType().Name));
Console.WriteLine();
}
private static void NestedPipelineTest()
{
Console.WriteLine("Nested Pipeline Test");
var input = 103;
Console.WriteLine(string.Format("Input Value: {0} [{1}]", input, input.GetType().Name));
var pipeline = new NestedPipeline();
var output = pipeline.Process(input);
Console.WriteLine(string.Format("Output Value: {0} [{1}]", output, output.GetType().Name));
Console.WriteLine();
}
private static void BranchingPipelineTest()
{
Console.WriteLine("Branching Pipeline Test");
foreach(int input in new int[] { 1, 10, 100, 1000 })
{
Console.WriteLine(string.Format("Input Value: {0} [{1}]", input, input.GetType().Name));
var pipeline = new BranchingPipeline();
var output = pipeline.Process(input);
Console.WriteLine(string.Format("Output Value: {0} [{1}]", output, output.GetType().Name));
}
Console.WriteLine();
}
private static void ModifiablePipelineTest()
{
Console.WriteLine("Configured Pipeline Test");
var input = 13;
Console.WriteLine(string.Format("Input Value: {0} [{1}]", input, input.GetType().Name));
XDocument xd = XDocument.Load("ConfigBasedPipeline.xml");
            //
// Patching the configuration data would go here
//
XElement pipelineXml = xd.Document.Element("pipeline");
try
{
var pipeline = new ExampleConfigBasedPipeline(pipelineXml);
var output = pipeline.Process(input);
Console.WriteLine(string.Format("Output Value: {0} [{1}]", output, output.GetType().Name));
}
catch(Exception ex)
{
Console.WriteLine("ERROR: " + ex.Message);
}
Console.WriteLine();
}
static void Main(string[] args)
{
BasicPipelineTest();
NestedPipelineTest();
BranchingPipelineTest();
ModifiablePipelineTest();
}
}
}
@wfigueiredo
Copy link

wfigueiredo commented May 27, 2021

Excellent work!
I'm new to C# and currently trying to improve this to an Async approach! =D

Here's what I did so far:


public interface IAsyncPipelineStep< TIn, TOut >
{
Task< TOut > ProcessAsync(TIn Input);
}

public static class AsyncPipelineStepExtensions
{
public static Task< TOut > Step< TIn, TOut >(this TIn Input, IAsyncPipelineStep< TIn, TOut > Step)
{
return Step.ProcessAsync(Input);
}
}

public abstract class AsyncPipeline< TIn, TOut > : IAsyncPipelineStep< TIn, TOut >
{
public Func< TIn, Task > _pipelineSteps { get; protected set; }

  public Task< TOut > ProcessAsync(TIn Input)
  {
      return _pipelineSteps(Input);
  }

}


public class CreateBillingAccountPipelineStep : IAsyncPipelineStep< BillingAccountDto, BillingAccount >
{
private readonly IBillingAccountService _billingAccountService;

    public CreateBillingAccountPipelineStep(IBillingAccountService billingAccountService)
    {
         _billingAccountService = billingAccountService;
    }

    public async Task<BillingAccount> ProcessAsync(BillingAccountDto Input)
    {    
        return await _billingAccountService.Create(Input.ToDomain());
    }
}

public class BillingCreatePipelineAsync : AsyncPipeline< BillingAccountDto, BillingAccount >
{
private readonly CreateBillingAccountPipelineStep _step1;

     public BillingCreatePipelineAsync(CreateBillingAccountPipelineStep step1){
        _step1 = step1;
        _pipelineSteps = Input => Input
            .Step(_step1);
     }

@wfigueiredo
Copy link

But I get the following compile error:

"The type arguments for method AsyncPipelineStepExtensions.Step<TIn, TOut>(this TIn Input, IAsyncPipelineStep<TIn, TOut> Step) cannot be inferred for the usage. Try specifying the type arguments explicitly."

I don't really get it whats wrong, since the entire Step chain is awaited somehow.

Any ideas what am I missing?

Thanks in advance!

@jermdavis
Copy link
Author

Async can be a difficult beast @wfigueiredo ;-) Once you start on it, it tends to spread across your codebase...

So I don't have all your code, but I think I can see a possible issue there. When I copy your example above into LinqPad and try to compile it I need to do two things to get rid of errors. First is create your BillingAccount classes. Since they're not included in your example, I had to guess - but just to make something that can compile I added this:

public class BillingAccountDto 
{ 
	public BillingAccount ToDomain()
	{
		return new BillingAccount();
	} 
}

public class BillingAccount
{
}

public interface IBillingAccountService
{
	Task<BillingAccount> Create(BillingAccount b); 
}

Doesn't do anything - but provides signatures which allow the rest of the code to compile.

The second thing seems like it might be an actual issue. In the AsyncPipeline<TIn, TOut> class, your example above is:

public abstract class AsyncPipeline<TIn, TOut> : IAsyncPipelineStep<TIn, TOut>
{
	public Func<TIn, Task> _pipelineSteps { get; protected set; }

	public Task<TOut> ProcessAsync(TIn Input)
	{
		return _pipelineSteps(Input);
	}
}

That gets a compiler error for me on the return statement: "CS0266 Cannot implicitly convert type 'System.Threading.Tasks.Task' to 'System.Threading.Tasks.Task'. An explicit conversion exists (are you missing a cast?)"

The reason for that is you've declared _pipelineSteps as Func<TIn, Task>, but you've declared ProcessAsync() to return Task<TOut>. You need the return type of your pipeline step functions to be a Task<TOut> to remove the need for any type conversion. The runtime can't infer a cast between Task and Task<TOut>.

So if I change that class above to be

public abstract class AsyncPipeline<TIn, TOut> : IAsyncPipelineStep<TIn, TOut>
{
	public Func<TIn, Task<TOut>> _pipelineSteps { get; protected set; }

	public Task<TOut> ProcessAsync(TIn Input)
	{
		return _pipelineSteps(Input);
	}
}

Then it compiles without error for me.

If I infer a bit more code for what a "does nothing" IBillingAccountService might look like:

public class BillingAccountService : IBillingAccountService
{
	public async Task<BillingAccount> Create(BillingAccount b)
	{
		 return await Task.FromResult(default(BillingAccount));
	}
}

Then I can run this without errors too:

async void Main()
{
	var bas = new BillingAccountService();
	var pl = new BillingCreatePipelineAsync(new CreateBillingAccountPipelineStep(bas));
	
	var dto = new BillingAccountDto();
	
	var result = await pl.ProcessAsync(dto);
}

But obviously, that doesn't include whatever code you have for these extra bits I've inferred - but maybe it helps you spot your issue?

@wfigueiredo
Copy link

Hi Jeremy, sorry for the delay. Firewall issues, seems to be ok now.

Well... Regarding the code, I still have compiler issues... 😥
The async pipeline structure seems to be ok, though.
Maybe it's somewhat related to my usage.
Gonna post again, with more details, so maybe you could point it more accurately than me:

IAsyncPipelineStep

public interface IAsyncPipelineStep<TIn, TOut>
  {
      Task<TOut> ProcessAsync(TIn Input);
  }

AsyncPipeline

public abstract class AsyncPipeline<TIn, TOut> : IAsyncPipelineStep<TIn, TOut>
  {
      public Func<TIn, Task<TOut>> _pipelineSteps { get; protected set; }

      public Task<TOut> ProcessAsync(TIn Input)
      {
          return _pipelineSteps(Input);
      }
  }

AsyncPipelineStepExtensions

public static class AsyncPipelineStepExtensions
  {
      public static Task<TOut> Step<TIn, TOut>(this TIn Input, IAsyncPipelineStep<TIn, TOut> Step)
      {
          return Step.ProcessAsync(Input);
      }
  }

Now for the usage:

CreateBillingAccountPipelineStep (Step 1)

public class CreateBillingAccountPipelineStep : IAsyncPipelineStep<BillingAccountDto, BillingAccount>
  {
      private readonly IBillingAccountService _billingAccountService;

      public CreateBillingAccountPipelineStep(IBillingAccountService billingAccountService)
      {
          _billingAccountService = billingAccountService;
      }

      public async Task<BillingAccount> ProcessAsync(BillingAccountDto Input)
      {
         return await _billingAccountService.Create(Input.ToDomain());  // simple service method which persists object in db
      }
  }

CreateFeeTicketsPipelineStep (Step 2)

public class CreateFeeTicketsPipelineStep : IAsyncPipelineStep<BillingAccount, IList<FeeTicket>>
  {
      private readonly IComplementWorkflowService<BillingAccount, FeeTicket> _workflowComplementService;

      public CreateFeeTicketsPipelineStep(IComplementWorkflowService<BillingAccount, FeeTicket> workflowComplementService)
      {
          _workflowComplementService = workflowComplementService;
      }

      public async Task<IList<FeeTicket>> ProcessAsync(BillingAccount Input)
      {
          return await _workflowComplementService.Complement(Input);
      }
  }

Now, to chain these 2 steps, I created the following class:

BillingCreatePipelineAsync

public class BillingCreatePipelineAsync : AsyncPipeline<BillingAccountDto, BillingAccount>
  {
      private readonly CreateBillingAccountPipelineStep _step1;
      private readonly CreateFeeTicketsPipelineStep _step2;

      public BillingCreatePipelineAsync(
          CreateBillingAccountPipelineStep step1,
          CreateFeeTicketsPipelineStep step2,
          )
      {
          _step1 = step1;
          _step2 = step2;

          _pipelineSteps = Input => Input
              .Step(_step1)
              .Step(_step2);   // <-- compiler error goes in this line [1]
      }
  }

[1] "The type arguments for method AsyncPipelineStepExtensions.Step<TIn, TOut>(this TIn Input, IAsyncPipelineStep<TIn, TOut> Step) cannot be inferred for the usage. Try specifying the type arguments explicitly."

Tried all day but still cannot point out exactly what is happening.
I think it happens when 2 or more steps are chained together.

Any help would be much appreciated. Thanks a lot! ☺

@jermdavis
Copy link
Author

Right - so pretty sure the problem here is going to be related to how "async" patterns tend to flow out through code. You've got a pattern for a generic async pipeline step:

public abstract class AsyncPipeline<TIn, TOut> : IAsyncPipelineStep<TIn, TOut>
{
	public Func<TIn, Task<TOut>> _pipelineSteps { get; protected set; }

	public Task<TOut> ProcessAsync(TIn Input)
	{
		return _pipelineSteps(Input);
	}
}

So as soon as you want to chain steps you hit a bit of a problem: The first step is Task<TOut> ProcessAsync(TIn Input) which is fine - but then you need to pass that Task<TOut> as the input to another step. But the input of the next step doesn't expect a Task<T> in, it just expects a T.

So the not-very-helpful compiler error here is cause by the compiler saying "You've asked me to chain these two pipeline steps, but the output of the first step does not match any overload I have for the input of the second step".

_pipelineSteps = Input => Input
	.Step(_step1)    // step declared as "BillingAccountDto ==> Task<BillingAccount>" - OK
	.Step(_step2);   // step declared as "BillingAccount ==> Task<IList<FeeTicket>>" - Fail 
                         // wants BillingAccount in, but gets Task<BillingAccount> instead.

As for how to fix that... That's more complex. ;-)

Either you need each pipeline step to resolve its internal async tasks to a concrete result (not a Task<T>), to make "a synchronous pipeline which happens to call some async things internally". Or you need to make all your steps take Task<T> as input and return Task<T> as an output, so you get a pipeline that is overall async.

I think the second of those is what you're aiming for here? So that involves a bit of change to what you wrote:

First up, all the base definitions for the abstract pipeline need to change so that they are taking tasks in and out:

public interface IAsyncPipelineStep<TIn, TOut>
{
	Task<TOut> ProcessAsync(Task<TIn> Input);
}

public abstract class AsyncPipeline<TIn, TOut> : IAsyncPipelineStep<TIn, TOut>
{
	public Func<Task<TIn>, Task<TOut>> _pipelineSteps { get; protected set; }

	public Task<TOut> ProcessAsync(Task<TIn> Input)
	{
		return _pipelineSteps(Input);
	}
}

public static class AsyncPipelineStepExtensions
{
	public static Task<TOut> Step<TIn, TOut>(this Task<TIn> Input, IAsyncPipelineStep<TIn, TOut> Step)
	{
		return Step.ProcessAsync(Input);
	}
}

And then from there, those changes need to ripple down to the concrete classes. Firstly, for the pipeline steps:

public class CreateBillingAccountPipelineStep : IAsyncPipelineStep<BillingAccountDto, BillingAccount>
{
	private readonly IBillingAccountService _billingAccountService;

	public CreateBillingAccountPipelineStep(IBillingAccountService billingAccountService)
	{
		_billingAccountService = billingAccountService;
	}

	public async Task<BillingAccount> ProcessAsync(Task<BillingAccountDto> Input)
	{
		var i = await Input;
		return await _billingAccountService.Create(i.ToDomain());  // simple service method which persists object in db
	}
}

public class CreateFeeTicketsPipelineStep : IAsyncPipelineStep<BillingAccount, IList<FeeTicket>>
{
	private readonly IComplementWorkflowService<BillingAccount, FeeTicket> _workflowComplementService;

	public CreateFeeTicketsPipelineStep(IComplementWorkflowService<BillingAccount, FeeTicket> workflowComplementService)
	{
		_workflowComplementService = workflowComplementService;
	}

	public async Task<IList<FeeTicket>> ProcessAsync(Task<BillingAccount> Input)
	{
		var i = await Input;
		return await _workflowComplementService.Complement(i);
	}
}

There are two changes here. First their signatures change to match the definitions above. But secondly, note that in the process step we're getting an input that is a Task<T> - the promise of a T in the future. But we can't continue until that's available. So we need to await the input, and then use that value for the onward processing.

That's the same change in both steps - and in fact it's probably necessary in all steps. So chances are there's a way of abstracting that somehow. But for simplicity, I'll leave it as is for the moment.

All those changes flow through to the overall pipeline object, which I don't think changes in its definition:

public class BillingCreatePipelineAsync : AsyncPipeline<BillingAccountDto, IList<FeeTicket>>
{
	private readonly CreateBillingAccountPipelineStep _step1;
	private readonly CreateFeeTicketsPipelineStep _step2;

	public BillingCreatePipelineAsync(
		CreateBillingAccountPipelineStep step1,
		CreateFeeTicketsPipelineStep step2
		)
	{
		_step1 = step1;
		_step2 = step2;

		_pipelineSteps = Input => Input
			.Step(_step1)
			.Step(_step2);   // No error here now!
	}
}

And that should all compile...

But there is another change that's required. The pipeline definition in the previous code snippet hasn't change, but it's underlying definition has changed because it's using out new task-in-task-out pattern. That means whatever code calls the pipeline needs a little tweak:

async Task Main()
{
	var bas = new BillingAccountService();
	var wcs = new ComplementWorkflowService();
	var pl = new BillingCreatePipelineAsync(new CreateBillingAccountPipelineStep(bas),new CreateFeeTicketsPipelineStep(wcs));

	var dto = new BillingAccountDto();
	var dtoTask = Task.FromResult(dto); // <-- this lets us get a Task for something we already have

	var result = await pl.ProcessAsync(dtoTask);
}

As above, I've knocked up some guesses at what the classes you didn't give me were:

public class BillingAccountService : IBillingAccountService
{
	public async Task<BillingAccount> Create(BillingAccount b)
	{
		return await Task.FromResult(default(BillingAccount));
	}
}

public class ComplementWorkflowService : IComplementWorkflowService<BillingAccount, FeeTicket>
{
	public async Task<IList<FeeTicket>> Complement(BillingAccount account)
	{
		return await Task.FromResult(new List<FeeTicket>());
	}
}

public class BillingAccountDto
{
	public BillingAccount ToDomain()
	{
		return new BillingAccount();
	}
}

public class BillingAccount
{
}

public class FeeTicket
{
}

And with all that put together in LinqPad it compiles and runs for me. (Assuming I've remembered to copy all this bits of code into this comment, it should for you too - * crosses fingers *)

And hopefully that helps you understand what was wrong with your code?

@wfigueiredo
Copy link

Hi Jeremy, how's it going?

Sorry for the delay: I've been doing some tinkering in my code regarding the async pipeline approach.
Much of the infrastructure had to be changed, so I took the proper time.

No compile errors now, but... Some other issues are going on. 😥

Main one is that I noticed that steps are being executed out of order! (as crazy as it may seem...)

I did change the AsyncPipeline extensions to this:

AsyncPipelineStepExtensions

public static class AsyncPipelineStepExtensions
  {
      public async static Task<TOut> Step<TIn, TOut>(this Task<TIn> Input, IAsyncPipelineStep<TIn, TOut> Step)
      {
          return await Step.ProcessAsync(Input);
      }
  }

expecting that it should be somewhat related to lack of "Step await", but no progress at all... 😓

Have you tried an async approach yourself?
I didn't expect a challenge like this in C# if you ask me... LoL

Again, thanks a bunch for the cooperation!

@wfigueiredo
Copy link

public class BillingCreatePipelineAsync : AsyncPipeline<BillingAccountDto, BillingAccount>
  {
      private readonly CreateBillingAccountPipelineStep _step1;
      private readonly CreateFeeTicketsPipelineStep _step2;
      private readonly PublishFeeTicketsPipelineStep _step3;
      private readonly UpdateBillingAccountSentStep _step4;

      public BillingCreatePipelineAsync(
          CreateBillingAccountPipelineStep step1,
          CreateFeeTicketsPipelineStep step2,
          PublishFeeTicketsPipelineStep step3,
          UpdateBillingAccountSentStep step4)
      {
          _step1 = step1;
          _step2 = step2;
          _step3 = step3;
          _step4 = step4;

          _pipelineSteps = Input => Input
              .Step(_step1)
              .Step(_step2)
              .Step(_step3)
              .Step(_step4);
      }
  }

In this case, the execution order is being triggered like this:

Step1 --> Step3 --> Step4 --> Step2 😨😨😨

@jermdavis
Copy link
Author

Ok, so two things:

Firstly, making the AsyncPipelineStepExtensions async: I'm not sure you need that? Given the step interface still has task-in-task-out, I think you just end up awaiting the same task twice there? Once in the extension method, and once in the next step? I don't think that matters because the first await will resolve the Task's value, and the second will then effectively do nothing because the Task already has its value. But I think the code works without it? Methods with a return type of Task<T> but no async modifier just return the task - they don't do any complicated compiler business to implement asynchronous behaviour. And in this case I think that's fine?

Secondly, ordering: Are the steps really happening out-of-order, or are you seeing the effects of how the whole async thing works? The internals of what the compiler does with async code is pretty complex, but it boils down to "while an await statement is waiting for the future-value of a Task, lets try to run some other code if we can". Internally this is done with complicated state machines, that allow you to write linear-looking code, while behind the scenes the compiler generates wrappers that allow that flow of execution to work despite there being the old pattern of "BeginSomething()" / "EndSomething()" asyncronicity under the hood?

(BTW I don't consider myself too knowledgeable on how this stuff works - so it's possible I'm misunderstanding stuff here - but this is how I think it works...)

And this has turned into a bit of an epic answer - sorry ;-)

Anyway, I think we can see the outline of that behaviour in a simple example. If I create three steps which look like this:

public class HttpFetchAsyncStep : IAsyncPipelineStep<Uri, string>
{
	private static readonly HttpClient _client = new HttpClient();

	public async Task<string> ProcessAsync(Task<Uri> Input)
	{
		Console.WriteLine("Entering Step 1");
		
		var uri = await Input;
		Console.WriteLine("HaveInput Step 1");
		
		Console.WriteLine("Leaving Step 1");
		
		return await _client.GetStringAsync(uri);
	}
}

The first step does an HTTP fetch for whatever URL comes in, and it outputs some text at three key points: On entry, after the input await and before it returns.

public class ModifyTextAsyncStep : IAsyncPipelineStep<string, string>
{
	public async Task<string> ProcessAsync(Task<string> Input)
	{
		Console.WriteLine("Entering Step 2");
		
		var txt = await Input;
		Console.WriteLine("HaveInput Step 2");
		
		var output = txt.Replace("BBC", "Not the BBC");
		
		Console.WriteLine("Leaving Step 2");
		
		return output;
	}
}

The second step does some text replacement, and outputs similar messages.

public class DiskWriteAsyncStep : IAsyncPipelineStep<string, string>
{
	public async Task<string> ProcessAsync(Task<string> Input)
	{
		Console.WriteLine("Entering Step 3");
		
		var data = await Input;
		Console.WriteLine("HaveInput Step 3");
		
		var fileName = @".\test.txt";

		await System.IO.File.WriteAllTextAsync(fileName, data);

		Console.WriteLine("Leaving Step 3");

		return fileName;
	}
}

And then step three writes the changed data to disk, and does another set of WriteLines in the same pattern.

I don't think the particular bits of code are important - I just wanted something simple-ish to demonstrate the behaviour. I suspect you'd see something similar in your code...

Now, if I make a pipeline from those:

public class ExampleAsyncPipeline : AsyncPipeline<Uri, string>
{
	public ExampleAsyncPipeline()
	{
		_pipelineSteps = input => input
			.Step(new HttpFetchAsyncStep())
			.Step(new ModifyTextAsyncStep())
			.Step(new DiskWriteAsyncStep());

	}
}

It will run a pipeline with the steps in that order, but the output looks like:

Entering Step 1
HaveInput Step 1
Leaving Step 1
Entering Step 2
Entering Step 3
HaveInput Step 2
Leaving Step 2
HaveInput Step 3
Leaving Step 3

So you can see the steps don't happen completely linearly:

  • It calls step one, and that all looks like it happens in order, because its input is an "already completed" task, and it starts the async HTTP fetch, and returns the promise of that response value in the future.
  • The second step starts, and it awaits that input variable. The HTTP fetch hasn't completed here, so this work gets shunted off into the background, and the runtime looks for other work it can do. Interestingly, (confusingly?) that other work is to start step three by passing in a task for "you'll get your data eventually". So it looks like step three starts before step two completes. And step three also gets stuck awaiting its input task. At this point the runtime has to look elsewhere for work to do. (If there is any - it may just block)
  • Then the await for step two's input task completes and provides its value. That allows step two to complete the rest of its processing and return its output.
  • And that allows step three's input await to complete and hence it can do the rest of its processing.

So I think that's why you see aparently out-of-order execution? The runtime behind the async / await calls is making sure that where necessary, the flow of your program is preserved, but it trys to fill the time during the "waiting for a future value" bits of the long-running operations with other bits of work where it can?

Now thinking about it, I suspect this allows for some odd behaviour in the code here if anything "important" happens before awaiting the input in an individual step. (The delay between that code, and the await completing could lead to deadlocks or other strangeness in some code) That makes me want to abstract that await out of the step itself and hide it away in some base code. but that's trickier to achieve - as it needs more change. Quick attempt...

First, the pipeline definition needs to allow explicitly for both Task<TIn> and TIn:

public interface IAsyncPipelineStep<TIn, TOut>
{
	Task<TOut> ProcessAsync(Task<TIn> Input);
	Task<TOut> ProcessAsync(TIn Input);
}

We'd already added that signature to AsyncPipeline<TIn, TOut> - so that doesn't change. And we can leave AsyncPipelineStepExtensions too, as it already has all its required methods.

To hide that input await we can have an abstract base for each pipeline step:

public abstract class BaseAsyncStep<TIn, TOut> : IAsyncPipelineStep<TIn, TOut>
{
	public async Task<TOut> ProcessAsync(Task<TIn> Input)
	{
		var input = await Input;
		return await ProcessAsync(input);
	}

	public abstract Task<TOut> ProcessAsync(TIn Input);
}

Than handles the Task<TIn> method by awaiting the input. And it delegaes the TIn method to the concrete class. So we can tweak the individual step definitions from my example above:

public class HttpFetchAsyncStep : BaseAsyncStep<Uri, string>
{
	private static readonly HttpClient _client = new HttpClient();

	public override async Task<string> ProcessAsync(Uri Input)
	{
		Console.WriteLine("Entering Step 1");

		Console.WriteLine("Leaving Step 1");

		return await _client.GetStringAsync(Input);
	}
}

public class ModifyTextAsyncStep : BaseAsyncStep<string, string>
{
	public override async Task<string> ProcessAsync(string Input)
	{
		Console.WriteLine("Entering Step 2");

		var output = Input.Replace("BBC", "CCB");

		Console.WriteLine("Leaving Step 2");

		return output;
	}
}

public class DiskWriteAsyncStep : BaseAsyncStep<string, string>
{
	public override async Task<string> ProcessAsync(string Input)
	{
		Console.WriteLine("Entering Step 3");

		var fileName = @"C:\Users\jeremy.davis\Desktop\test.txt";

		await System.IO.File.WriteAllTextAsync(fileName, Input);

		Console.WriteLine("Leaving Step 3");

		return fileName;
	}
}

They override that abstract TIn method bnow, and never have to await their input.

If I run that, the output becomes

Entering Step 1
Leaving Step 1
Entering Step 2
Leaving Step 2
Entering Step 3
Leaving Step 3

Which makes it look all nice and synchronous. But actually all we did was hide the ordering oddity from above. If I add some extra debug data to the abstract step type:

public abstract class BaseAsyncStep<TIn, TOut> : IAsyncPipelineStep<TIn, TOut>
{
	public async Task<TOut> ProcessAsync(Task<TIn> Input)
	{
		Console.WriteLine($"Awaiting input for {this.GetType().Name}");
		
		var input = await Input;
		
		Console.WriteLine($"Got input for {this.GetType().Name}");
		
		return await ProcessAsync(input);
	}

	public abstract Task<TOut> ProcessAsync(TIn Input);
}

Then you can see that the ordering still has the whole "await will wander off and do other stuff" behaviour:

Awaiting input for HttpFetchAsyncStep
Got input for HttpFetchAsyncStep
Entering Step 1
Leaving Step 1
Awaiting input for ModifyTextAsyncStep
Awaiting input for DiskWriteAsyncStep
Got input for ModifyTextAsyncStep
Entering Step 2
Leaving Step 2
Got input for DiskWriteAsyncStep
Entering Step 3
Leaving Step 3

Difference here being that it's harder for someone writing a concrete step to accidentally mess up because of that...

Does that make sense?

@jermdavis
Copy link
Author

Actually - having thought about this harder, I think there's a better solution that makes all this simpler. We can move all the logic for handling a Task<TIn> into the extension method class, and simplify everything else...

Instead of the second batch of code from the previous comment...

public interface IAsyncPipelineStep<TIn, TOut>
{
	Task<TOut> ProcessAsync(TIn Input);
}

public abstract class AsyncPipeline<TIn, TOut> : IAsyncPipelineStep<TIn, TOut>
{
	public Func<TIn, Task<TOut>> _pipelineSteps { get; protected set; }

	public Task<TOut> ProcessAsync(TIn Input)
	{
		return _pipelineSteps(Input);
	}
}

public static class AsyncPipelineStepExtensions
{
	public async static Task<TOut> Step<TIn, TOut>(this Task<TIn> Input, IAsyncPipelineStep<TIn, TOut> Step)
	{
		Console.WriteLine($"Awaiting input for {Step.GetType().Name}");
		var input = await Input;
		Console.WriteLine($"Got input for {Step.GetType().Name}");
		
		return await Step.ProcessAsync(input);
	}

	public async static Task<TOut> Step<TIn, TOut>(this TIn Input, IAsyncPipelineStep<TIn, TOut> Step)
	{
		Console.WriteLine($"Got input for {Step.GetType().Name}");
		return await Step.ProcessAsync(Input);
	}
}

and the individual example steps end up looking like:

public class HttpFetchAsyncStep : IAsyncPipelineStep<Uri, string>
{
	private static readonly HttpClient _client = new HttpClient();

	public async Task<string> ProcessAsync(Uri Input)
	{
		Console.WriteLine("Entering HttpFetchAsyncStep #1");
		
		Console.WriteLine("Leaving HttpFetchAsyncStep #1");
		
		return await _client.GetStringAsync(Input);
	}
}

public class ModifyTextAsyncStep : IAsyncPipelineStep<string, string>
{
	public async Task<string> ProcessAsync(string Input)
	{
		Console.WriteLine("Entering ModifyTextAsyncStep #2");

		var output = Input.Replace("BBC", "Not the BBC");

		Console.WriteLine("Leaving ModifyTextAsyncStep #2");

		return output;
	}
}

public class DiskWriteAsyncStep : IAsyncPipelineStep<string, string>
{
	public async Task<string> ProcessAsync(string Input)
	{
		Console.WriteLine("Entering DiskWriteAsyncStep #3");
		
		var desktopFolder = System.Environment.GetFolderPath(Environment.SpecialFolder.Desktop);
		var fileName =  System.IO.Path.Combine(desktopFolder, "test.txt");

		await System.IO.File.WriteAllTextAsync(fileName, Input);

		Console.WriteLine("Leaving DiskWriteAsyncStep #3");

		return fileName;
	}
}

That gives the same overall behaviour, but doesn't require an abstract base for pipeline steps, and doesn't suffer from the "putting code before the await for the input" issue I mentioned before.

@wfigueiredo
Copy link

Hi Jeremy!
Sorry for the delay... I've not been feeling well the last few days (not Covid, though)

Yeah, you nailed it: it was not the order, but the async behaviour instead (wow, I'm really confused by all of this, lol...)
As I said, I'm new to C#/.NET so thanks again for your patience.

I did include your latest suggestion for AsyncPipelineStepExtensions and now works beautiful!
I think you really should include this Async approach to your gists... It helped me a lot, and can be of use for other devs too!
Amazing job, I really appreciate it! =D

gonna keep testing new scenarios, but I think most of the infrastructure is now solid enough to go on.
Thanks!!!! =)

@jermdavis
Copy link
Author

Yeah totally agree the behaviour of async can be confusing. There are good books about this stuff though, if you want to learn a bit more - "C# In Depth" by Jon Skeet covers useful detail about the hows & whys of async. And it also covers lots of other helpful stuff for people new to the C# language overall. Maybe work will let you put a copy on expenses for learning?

Glad you're getting on top of all this though - happy have been able to help. Certainly not going to waste all this interesting material - what I've worked out will end up on my blog and I'll add new gists for the changed code as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment