Skip to content

Instantly share code, notes, and snippets.

@pitermarx
Created November 3, 2023 15:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pitermarx/8d25cdb168bd18929d44abdd633cd109 to your computer and use it in GitHub Desktop.
Save pitermarx/8d25cdb168bd18929d44abdd633cd109 to your computer and use it in GitHub Desktop.
Idea for an abstraction over TPL DataFlow
public class DataPipeline<T>
{
private static ExecutionDataflowBlockOptions Options => new() { BoundedCapacity = 1 };
private readonly IReceivableSourceBlock<T> sourceBlock;
public DataPipeline(IReceivableSourceBlock<T> sourceBlock) => this.sourceBlock = sourceBlock;
public Task Completion => sourceBlock.Completion;
public IAsyncEnumerable<T> ReceiveAllAsync(CancellationToken ct = default) => sourceBlock.ReceiveAllAsync(ct);
public DataPipeline<TOut> Activity<TOut>(Func<T, Task<TOut>> action) => Link(new TransformBlock<T, TOut>(action, Options));
public DataPipeline<TOut> Activity<TOut>(Func<T, TOut> action) => Link(new TransformBlock<T, TOut>(action, Options));
private DataPipeline<TOut> Link<TOut>(TransformBlock<T, TOut> block)
{
sourceBlock.LinkTo(block, new DataflowLinkOptions { PropagateCompletion = true });
return new DataPipeline<TOut>(block);
}
}
public static class DataPipelineExtensions
{
// Creates an AsyncEnumerable from an IEnumerable, after passing through the activities on a data pipeline
// Each activity runs in parallel with other activities but only one item at a time passes through each activity
public static async IAsyncEnumerable<TOut> CreateDataPipeline<T, TOut>(
this IEnumerable<T> source,
Func<DataPipeline<T>, DataPipeline<TOut>> flowConfig,
[EnumeratorCancellation]CancellationToken ct = default)
{
var bufferBlock = new BufferBlock<T>(new DataflowBlockOptions{ BoundedCapacity = 1 });
var flow = flowConfig(new DataPipeline<T>(bufferBlock));
_ = source.ConsumeItemsAsync(bufferBlock, ct);
// always processes all items in the bufferBlock.
// only the cancellation token can exit early
await foreach (var item in flow.ReceiveAllAsync(CancellationToken.None))
{
yield return item;
}
if (!flow.Completion.IsCompletedSuccessfully)
{
// await the completion to get notified of exceptions in the pipeline
await flow.Completion;
}
}
// Consumes
private static async Task ConsumeItemsAsync<T>(this IEnumerable<T> source, BufferBlock<T> bufferBlock, CancellationToken ct = default)
{
try
{
foreach (var item in source)
{
if (ct.IsCancellationRequested) break;
await bufferBlock.SendAsync(item, ct);
}
}
catch (TaskCanceledException)
{
// swallow
}
catch (Exception e)
{
(bufferBlock as IDataflowBlock)?.Fault(e);
}
bufferBlock.Complete();
}
}
using System.Runtime.CompilerServices;
using System.Threading.Tasks.Dataflow;
var cancellationTokenSource = new CancellationTokenSource();
// If this enumerable is fully consumed, then it will throw
var enumerableThatThrows = new []{ "a", "b", "c", "d", "e", "f", "g", "h", "i" }
.Select(l => l == "h" ? throw new Exception() : l);
var dataFlow = enumerableThatThrows
// Helper method to create data pipeline from IEnumerable
.CreateDataPipeline(
pipeline => pipeline
// Each Activity runs in parallel with other activities
.Activity(letter => {Console.WriteLine(letter); return letter[0]; })
// But only one item is being processed at a time in each activity
.Activity(async letter => { await Task.Delay(1000); return new string(letter, (byte)letter); })
// Can be sync or async
.Activity(item => item + "\n"),
cancellationTokenSource.Token);
Console.WriteLine("Starting...");
await foreach (var item in dataFlow) {
Console.Write(item);
// we can cancel and terminate the pipeline early
if (item.Contains('c')) cancellationTokenSource.Cancel();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment