Skip to content

Instantly share code, notes, and snippets.

@giacomociti
Created March 14, 2018 16:34
Show Gist options
  • Save giacomociti/1e95d95d6a0d02d60472e9207d8c2b9c to your computer and use it in GitHub Desktop.
Save giacomociti/1e95d95d6a0d02d60472e9207d8c2b9c to your computer and use it in GitHub Desktop.
Ordering behavior of dataflow
[Fact]
public async Task TransformBlockPreserveOrder()
{
const int badItem = 89;
var results = new List<int>();
var input = Enumerable.Range(1, int.MaxValue);
var mapBlock = new TransformBlock<int, int>(
x => x == badItem? throw new Exception("bug") : x,
new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 10});
var actionBlock = new ActionBlock<int>(x => results.Add(x));
mapBlock.LinkTo(actionBlock, new DataflowLinkOptions {PropagateCompletion = true});
var ex = await Assert.ThrowsAsync<AggregateException>(() => Run(mapBlock, actionBlock, input));
Assert.Equal("bug", ex.InnerException?.Message); // this is ok
Assert.True(results.Count < badItem); // this fails
Assert.True(input.Take(results.Count).SequenceEqual(results)); // this fails
}
private static async Task Run<T>(ITargetBlock<T> firstBlock, IDataflowBlock lastBlock, IEnumerable<T> input)
{
foreach (var item in input)
{
var sent = await firstBlock.SendAsync(item);
if (!sent) break;
}
firstBlock.Complete();
await lastBlock.Completion;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment