Skip to content

Instantly share code, notes, and snippets.

@jmichas
Last active December 12, 2021 14:57
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save jmichas/bfab9cec84f0d1e40e12 to your computer and use it in GitHub Desktop.
Save jmichas/bfab9cec84f0d1e40e12 to your computer and use it in GitHub Desktop.
This gist illustrates a TPL Dataflow Pipeline that branches and the various ways to propagate completion and why some work and some don't.
public class DataflowTestPipeline
{
public TransformBlock<Thing, Thing> MainBlock { get; set; }
public TransformBlock<Thing, Thing> Block1 { get; set; }
public TransformBlock<Thing, Thing> Block2 { get; set; }
public TransformBlock<Thing, Thing> Block3 { get; set; }
public ActionBlock<Thing> EndBlock { get; set; }
public void CreatePipelineSynchronousWithLinkOptions()
{
MainBlock = new TransformBlock<Thing, Thing>(thing =>
{
Debug.WriteLine("MainBlock");
DoGeneralStuffToThing(thing);
return thing;
});
CreateCommonSynchronousBlocks();
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
MainBlock.LinkTo(Block1, linkOptions, x => x.Status == Status.Complete);
MainBlock.LinkTo(Block2, linkOptions, x => x.Status == Status.Cancelled);
MainBlock.LinkTo(Block3, linkOptions, x => x.Status == Status.Delayed);
MainBlock.LinkTo(DataflowBlock.NullTarget<Thing>(), linkOptions);
Block1.LinkTo(EndBlock, linkOptions);
Block2.LinkTo(EndBlock, linkOptions);
Block3.LinkTo(EndBlock, linkOptions);
}
public void CreatePipelineSynchronousConditionalContinueWith()
{
MainBlock = new TransformBlock<Thing, Thing>(thing =>
{
Debug.WriteLine("MainBlock");
DoGeneralStuffToThing(thing);
if (thing.Status == Status.Complete)
{
MainBlock.Completion.ContinueWith(t => Block1.Complete());
}
if (thing.Status == Status.Cancelled)
{
MainBlock.Completion.ContinueWith(t => Block2.Complete());
}
if (thing.Status == Status.Delayed)
{
MainBlock.Completion.ContinueWith(t => Block3.Complete());
}
return thing;
});
CreateCommonSynchronousBlocks();
MainBlock.LinkTo(Block1, x => x.Status == Status.Complete);
MainBlock.LinkTo(Block2, x => x.Status == Status.Cancelled);
MainBlock.LinkTo(Block3, x => x.Status == Status.Delayed);
MainBlock.LinkTo(DataflowBlock.NullTarget<Thing>());
Block1.LinkTo(EndBlock);
Block2.LinkTo(EndBlock);
Block3.LinkTo(EndBlock);
Block1.Completion.ContinueWith(t => EndBlock.Complete());
Block2.Completion.ContinueWith(t => EndBlock.Complete());
Block3.Completion.ContinueWith(t => EndBlock.Complete());
}
public void CreatePipelineAsynchronousWithLinkOptions()
{
MainBlock = new TransformBlock<Thing, Thing>(async thing =>
{
Debug.WriteLine("MainBlock");
await DoGeneralStuffToThingAsync(thing);
return thing;
});
CreateCommonAsynchronousBlocks();
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
MainBlock.LinkTo(Block1, linkOptions, x => x.Status == Status.Complete);
MainBlock.LinkTo(Block2, linkOptions, x => x.Status == Status.Cancelled);
MainBlock.LinkTo(Block3, linkOptions, x => x.Status == Status.Delayed);
MainBlock.LinkTo(DataflowBlock.NullTarget<Thing>(), linkOptions);
Block1.LinkTo(EndBlock, linkOptions);
Block2.LinkTo(EndBlock, linkOptions);
Block3.LinkTo(EndBlock, linkOptions);
}
public void CreatePipelineAsynchronousConditionalContinueWith()
{
MainBlock = new TransformBlock<Thing, Thing>(async thing =>
{
Debug.WriteLine("MainBlock");
await DoGeneralStuffToThingAsync(thing);
if (thing.Status == Status.Complete)
{
MainBlock.Completion.ContinueWith(t => Block1.Complete());
}
if (thing.Status == Status.Cancelled)
{
MainBlock.Completion.ContinueWith(t => Block2.Complete());
}
if (thing.Status == Status.Delayed)
{
MainBlock.Completion.ContinueWith(t => Block3.Complete());
}
return thing;
});
CreateCommonAsynchronousBlocks();
MainBlock.LinkTo(Block1, x => x.Status == Status.Complete);
MainBlock.LinkTo(Block2, x => x.Status == Status.Cancelled);
MainBlock.LinkTo(Block3, x => x.Status == Status.Delayed);
MainBlock.LinkTo(DataflowBlock.NullTarget<Thing>());
Block1.LinkTo(EndBlock);
Block2.LinkTo(EndBlock);
Block3.LinkTo(EndBlock);
Block1.Completion.ContinueWith(t => EndBlock.Complete());
Block2.Completion.ContinueWith(t => EndBlock.Complete());
Block3.Completion.ContinueWith(t => EndBlock.Complete());
}
public void CreatePipelineAsynchronousTaskWhenAll()
{
MainBlock = new TransformBlock<Thing, Thing>(async thing =>
{
Debug.WriteLine("MainBlock");
await DoGeneralStuffToThingAsync(thing);
return thing;
});
CreateCommonAsynchronousBlocks();
MainBlock.LinkTo(Block1, x => x.Status == Status.Complete);
MainBlock.LinkTo(Block2, x => x.Status == Status.Cancelled);
MainBlock.LinkTo(Block3, x => x.Status == Status.Delayed);
MainBlock.LinkTo(DataflowBlock.NullTarget<Thing>());
Block1.LinkTo(EndBlock);
Block2.LinkTo(EndBlock);
Block3.LinkTo(EndBlock);
MainBlock.Completion.ContinueWith(t =>
{
Debug.WriteLine("MainBlock Complete..Continuing With...");
Block1.Complete();
Block2.Complete();
Block3.Complete();
});
Task.WhenAll(Block1.Completion, Block2.Completion, Block3.Completion)
.ContinueWith(t =>
{
Debug.WriteLine("All Blocks Complete..Continuing With...");
EndBlock.Complete();
});
}
private void CreateCommonSynchronousBlocks()
{
Block1 = new TransformBlock<Thing, Thing>(thing =>
{
Debug.WriteLine("Block1");
DoWorkOnThing(thing, 1);
return thing;
});
Block2 = new TransformBlock<Thing, Thing>(thing =>
{
Debug.WriteLine("Block2");
DoWorkOnThing(thing, 2);
return thing;
});
Block3 = new TransformBlock<Thing, Thing>(thing =>
{
Debug.WriteLine("Block3");
DoWorkOnThing(thing, 3);
return thing;
});
EndBlock = new ActionBlock<Thing>(thing => { Debug.WriteLine("EndBlock"); });
}
private void CreateCommonAsynchronousBlocks()
{
Block1 = new TransformBlock<Thing, Thing>(async thing =>
{
Debug.WriteLine("Block1");
await DoWorkOnThingAsync(thing, 1);
return thing;
});
Block2 = new TransformBlock<Thing, Thing>(async thing =>
{
Debug.WriteLine("Block2");
await DoWorkOnThingAsync(thing, 2);
return thing;
});
Block3 = new TransformBlock<Thing, Thing>(async thing =>
{
Debug.WriteLine("Block3");
await DoWorkOnThingAsync(thing, 3);
return thing;
});
EndBlock = new ActionBlock<Thing>(thing => { Debug.WriteLine("EndBlock"); });
}
private void DoGeneralStuffToThing(Thing thing)
{
thing.Name = "I am thing";
}
private async Task DoGeneralStuffToThingAsync(Thing thing)
{
await Task.Run(() =>
{
Task.Delay(5000).Wait();
DoGeneralStuffToThing(thing);
});
}
private void DoWorkOnThing(Thing thing, int which)
{
switch (which)
{
case 1:
thing.ProcessedByBlock1 = DateTime.UtcNow;
break;
case 2:
thing.ProcessedByBlock2 = DateTime.UtcNow;
break;
case 3:
thing.ProcessedByBlock3 = DateTime.UtcNow;
break;
}
}
private async Task DoWorkOnThingAsync(Thing thing, int which)
{
await Task.Run(() =>
{
Task.Delay(5000).Wait();
DoWorkOnThing(thing, which);
});
}
public class Thing
{
public Status Status { get; set; }
public DateTime? ProcessedByBlock1 { get; set; }
public DateTime? ProcessedByBlock2 { get; set; }
public DateTime? ProcessedByBlock3 { get; set; }
public string Name { get; set; }
}
public enum Status
{
Complete,
Cancelled,
Delayed
}
}
[TestClass]
public class PipelineTests
{
[TestMethod]
public async Task TestDataFlowBlockPipeline_SynchronousUsingPropagateCompletion()
{
var pipe = new DataflowTestPipeline();
pipe.CreatePipelineSynchronousWithLinkOptions(); //create the pipeline that uses DataflowLinkOptions to propagate completion
var completeThing = new DataflowTestPipeline.Thing
{
Status = DataflowTestPipeline.Status.Complete
};
pipe.MainBlock.Post(completeThing);
pipe.MainBlock.Complete();
await pipe.EndBlock.Completion;
Debug.WriteLine("Doing Assert");
Assert.IsNotNull(completeThing.ProcessedByBlock1);
}
[TestMethod]
public async Task TestDataFlowBlockPipeline_SynchronousUsingConditionalContinueWithInTheBlock()
{
var pipe = new DataflowTestPipeline();
pipe.CreatePipelineSynchronousConditionalContinueWith(); //create the pipeline that uses conditional logic to propagate completion
var completeThing = new DataflowTestPipeline.Thing
{
Status = DataflowTestPipeline.Status.Complete
};
pipe.MainBlock.Post(completeThing);
pipe.MainBlock.Complete();
await pipe.EndBlock.Completion;
Debug.WriteLine("Doing Assert");
Assert.IsNotNull(completeThing.ProcessedByBlock1);
}
[TestMethod] //this one always fails because blocks 2 and 3 "complete" and propagate to EndBlock before block1 completes
public async Task TestDataFlowBlockPipeline_AsynchronousUsingPropagateCompletion()
{
var pipe = new DataflowTestPipeline();
pipe.CreatePipelineAsynchronousWithLinkOptions(); //create the pipeline that uses DataflowLinkOptions to propagate completion
var completeThing = new DataflowTestPipeline.Thing
{
Status = DataflowTestPipeline.Status.Complete
};
pipe.MainBlock.Post(completeThing);
pipe.MainBlock.Complete();
await pipe.EndBlock.Completion;
Debug.WriteLine("Doing Assert");
Assert.IsNotNull(completeThing.ProcessedByBlock1);
}
[TestMethod]
public async Task TestDataFlowBlockPipeline_AsynchronousUsingConditionalContinueWithInTheBlock()
{
var pipe = new DataflowTestPipeline();
pipe.CreatePipelineAsynchronousConditionalContinueWith(); //create the pipeline that uses conditional logic to propagate completion
var completeThing = new DataflowTestPipeline.Thing
{
Status = DataflowTestPipeline.Status.Complete
};
pipe.MainBlock.Post(completeThing);
pipe.MainBlock.Complete();
await pipe.EndBlock.Completion;
Debug.WriteLine("Doing Assert");
Assert.IsNotNull(completeThing.ProcessedByBlock1);
}
[TestMethod]
public async Task TestDataFlowBlockPipeline_AsynchronousUsingTaskWhenAll()
{
var pipe = new DataflowTestPipeline();
pipe.CreatePipelineAsynchronousTaskWhenAll();
var completeThing = new DataflowTestPipeline.Thing
{
Status = DataflowTestPipeline.Status.Complete
};
pipe.MainBlock.Post(completeThing);
pipe.MainBlock.Complete();
await pipe.EndBlock.Completion;
Debug.WriteLine("Doing Assert");
Assert.IsNotNull(completeThing.ProcessedByBlock1);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment