Skip to content

Instantly share code, notes, and snippets.

@jermdavis
Last active February 15, 2023 11:33
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jermdavis/d794849103df95eeda2eec330f73411f to your computer and use it in GitHub Desktop.
Save jermdavis/d794849103df95eeda2eec330f73411f to your computer and use it in GitHub Desktop.
An initial (not great) example for asynchronous pipeline code. See blog post for more info: https://blog.jermdavis.dev/posts/2021/pipelines-and-async BUT you probably want this gist instead, because it's the (hopefully better) v2: https://gist.github.com/jermdavis/49ecd692a16b10899eb2ee2b50770499
async Task Main()
{
var pipeline = new ExampleAsyncPipeline();
var uri = new Uri("https://news.bbc.co.uk/");
var tempFile = await pipeline.ProcessAsync(uri);
Console.WriteLine($"{uri} saved to {tempFile}");
}
public interface IAsyncPipelineStep<TIn, TOut>
{
Task<TOut> ProcessAsync(Task<TIn> Input);
}
public abstract class AsyncPipeline<TIn, TOut> : IAsyncPipelineStep<TIn, TOut>
{
public Func<Task<TIn>, Task<TOut>> _pipelineSteps { get; protected set; }
public Task<TOut> ProcessAsync(Task<TIn> Input)
{
return _pipelineSteps(Input);
}
public Task<TOut> ProcessAsync(TIn Input)
{
var inputTask = Task.FromResult(Input);
return _pipelineSteps(inputTask);
}
}
public static class AsyncPipelineStepExtensions
{
public static Task<TOut> Step<TIn, TOut>(this Task<TIn> Input, IAsyncPipelineStep<TIn, TOut> Step)
{
return Step.ProcessAsync(Input);
}
}
public class HttpFetchAsyncStep : IAsyncPipelineStep<Uri, string>
{
private static readonly HttpClient _client = new HttpClient();
public async Task<string> ProcessAsync(Task<Uri> Input)
{
var uri = await Input;
return await _client.GetStringAsync(uri);
}
}
public class DiskWriteAsyncStep : IAsyncPipelineStep<string, string>
{
public async Task<string> ProcessAsync(Task<string> Input)
{
var data = await Input;
var fileName = System.IO.Path.GetTempFileName();
await System.IO.File.WriteAllTextAsync(fileName, data);
return fileName;
}
}
public class ExampleAsyncPipeline : AsyncPipeline<Uri, string>
{
public ExampleAsyncPipeline()
{
_pipelineSteps = input => input
.Step(new HttpFetchAsyncStep())
.Step(new DiskWriteAsyncStep());
}
}
@aroques
Copy link

aroques commented Feb 10, 2023

Thank you Jeremy. I found your Gists through your blog posts about pipelines. I'll haven't finished reading all of them, but they are really interesting and helpful. I am using this async pipeline code as a starting point. Also, I wanted to share a ThenOptionally extension method that I wrote. (I renamed Step to Then because I think it reads better)

        public async static Task<TOutput> ThenOptionally<TInput, TOutput>(this Task<TInput> input, Func<Task<TInput>, Task<bool>> shouldProcessStep, IAsyncPipelineStep<TInput, TOutput> step) where TInput : TOutput
        {
            if (await shouldProcessStep(input))
            {
                return await step.ProcessAsync(input);
            }
            else
            {
                return await input;
            }
        }

@jermdavis
Copy link
Author

Thanks - glad its of help.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment