Last active
December 12, 2021 14:57
-
-
Save jmichas/bfab9cec84f0d1e40e12 to your computer and use it in GitHub Desktop.
This gist illustrates a TPL Dataflow Pipeline that branches and the various ways to propagate completion and why some work and some don't.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class DataflowTestPipeline | |
{ | |
public TransformBlock<Thing, Thing> MainBlock { get; set; } | |
public TransformBlock<Thing, Thing> Block1 { get; set; } | |
public TransformBlock<Thing, Thing> Block2 { get; set; } | |
public TransformBlock<Thing, Thing> Block3 { get; set; } | |
public ActionBlock<Thing> EndBlock { get; set; } | |
public void CreatePipelineSynchronousWithLinkOptions() | |
{ | |
MainBlock = new TransformBlock<Thing, Thing>(thing => | |
{ | |
Debug.WriteLine("MainBlock"); | |
DoGeneralStuffToThing(thing); | |
return thing; | |
}); | |
CreateCommonSynchronousBlocks(); | |
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true }; | |
MainBlock.LinkTo(Block1, linkOptions, x => x.Status == Status.Complete); | |
MainBlock.LinkTo(Block2, linkOptions, x => x.Status == Status.Cancelled); | |
MainBlock.LinkTo(Block3, linkOptions, x => x.Status == Status.Delayed); | |
MainBlock.LinkTo(DataflowBlock.NullTarget<Thing>(), linkOptions); | |
Block1.LinkTo(EndBlock, linkOptions); | |
Block2.LinkTo(EndBlock, linkOptions); | |
Block3.LinkTo(EndBlock, linkOptions); | |
} | |
public void CreatePipelineSynchronousConditionalContinueWith() | |
{ | |
MainBlock = new TransformBlock<Thing, Thing>(thing => | |
{ | |
Debug.WriteLine("MainBlock"); | |
DoGeneralStuffToThing(thing); | |
if (thing.Status == Status.Complete) | |
{ | |
MainBlock.Completion.ContinueWith(t => Block1.Complete()); | |
} | |
if (thing.Status == Status.Cancelled) | |
{ | |
MainBlock.Completion.ContinueWith(t => Block2.Complete()); | |
} | |
if (thing.Status == Status.Delayed) | |
{ | |
MainBlock.Completion.ContinueWith(t => Block3.Complete()); | |
} | |
return thing; | |
}); | |
CreateCommonSynchronousBlocks(); | |
MainBlock.LinkTo(Block1, x => x.Status == Status.Complete); | |
MainBlock.LinkTo(Block2, x => x.Status == Status.Cancelled); | |
MainBlock.LinkTo(Block3, x => x.Status == Status.Delayed); | |
MainBlock.LinkTo(DataflowBlock.NullTarget<Thing>()); | |
Block1.LinkTo(EndBlock); | |
Block2.LinkTo(EndBlock); | |
Block3.LinkTo(EndBlock); | |
Block1.Completion.ContinueWith(t => EndBlock.Complete()); | |
Block2.Completion.ContinueWith(t => EndBlock.Complete()); | |
Block3.Completion.ContinueWith(t => EndBlock.Complete()); | |
} | |
public void CreatePipelineAsynchronousWithLinkOptions() | |
{ | |
MainBlock = new TransformBlock<Thing, Thing>(async thing => | |
{ | |
Debug.WriteLine("MainBlock"); | |
await DoGeneralStuffToThingAsync(thing); | |
return thing; | |
}); | |
CreateCommonAsynchronousBlocks(); | |
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true }; | |
MainBlock.LinkTo(Block1, linkOptions, x => x.Status == Status.Complete); | |
MainBlock.LinkTo(Block2, linkOptions, x => x.Status == Status.Cancelled); | |
MainBlock.LinkTo(Block3, linkOptions, x => x.Status == Status.Delayed); | |
MainBlock.LinkTo(DataflowBlock.NullTarget<Thing>(), linkOptions); | |
Block1.LinkTo(EndBlock, linkOptions); | |
Block2.LinkTo(EndBlock, linkOptions); | |
Block3.LinkTo(EndBlock, linkOptions); | |
} | |
public void CreatePipelineAsynchronousConditionalContinueWith() | |
{ | |
MainBlock = new TransformBlock<Thing, Thing>(async thing => | |
{ | |
Debug.WriteLine("MainBlock"); | |
await DoGeneralStuffToThingAsync(thing); | |
if (thing.Status == Status.Complete) | |
{ | |
MainBlock.Completion.ContinueWith(t => Block1.Complete()); | |
} | |
if (thing.Status == Status.Cancelled) | |
{ | |
MainBlock.Completion.ContinueWith(t => Block2.Complete()); | |
} | |
if (thing.Status == Status.Delayed) | |
{ | |
MainBlock.Completion.ContinueWith(t => Block3.Complete()); | |
} | |
return thing; | |
}); | |
CreateCommonAsynchronousBlocks(); | |
MainBlock.LinkTo(Block1, x => x.Status == Status.Complete); | |
MainBlock.LinkTo(Block2, x => x.Status == Status.Cancelled); | |
MainBlock.LinkTo(Block3, x => x.Status == Status.Delayed); | |
MainBlock.LinkTo(DataflowBlock.NullTarget<Thing>()); | |
Block1.LinkTo(EndBlock); | |
Block2.LinkTo(EndBlock); | |
Block3.LinkTo(EndBlock); | |
Block1.Completion.ContinueWith(t => EndBlock.Complete()); | |
Block2.Completion.ContinueWith(t => EndBlock.Complete()); | |
Block3.Completion.ContinueWith(t => EndBlock.Complete()); | |
} | |
public void CreatePipelineAsynchronousTaskWhenAll() | |
{ | |
MainBlock = new TransformBlock<Thing, Thing>(async thing => | |
{ | |
Debug.WriteLine("MainBlock"); | |
await DoGeneralStuffToThingAsync(thing); | |
return thing; | |
}); | |
CreateCommonAsynchronousBlocks(); | |
MainBlock.LinkTo(Block1, x => x.Status == Status.Complete); | |
MainBlock.LinkTo(Block2, x => x.Status == Status.Cancelled); | |
MainBlock.LinkTo(Block3, x => x.Status == Status.Delayed); | |
MainBlock.LinkTo(DataflowBlock.NullTarget<Thing>()); | |
Block1.LinkTo(EndBlock); | |
Block2.LinkTo(EndBlock); | |
Block3.LinkTo(EndBlock); | |
MainBlock.Completion.ContinueWith(t => | |
{ | |
Debug.WriteLine("MainBlock Complete..Continuing With..."); | |
Block1.Complete(); | |
Block2.Complete(); | |
Block3.Complete(); | |
}); | |
Task.WhenAll(Block1.Completion, Block2.Completion, Block3.Completion) | |
.ContinueWith(t => | |
{ | |
Debug.WriteLine("All Blocks Complete..Continuing With..."); | |
EndBlock.Complete(); | |
}); | |
} | |
private void CreateCommonSynchronousBlocks() | |
{ | |
Block1 = new TransformBlock<Thing, Thing>(thing => | |
{ | |
Debug.WriteLine("Block1"); | |
DoWorkOnThing(thing, 1); | |
return thing; | |
}); | |
Block2 = new TransformBlock<Thing, Thing>(thing => | |
{ | |
Debug.WriteLine("Block2"); | |
DoWorkOnThing(thing, 2); | |
return thing; | |
}); | |
Block3 = new TransformBlock<Thing, Thing>(thing => | |
{ | |
Debug.WriteLine("Block3"); | |
DoWorkOnThing(thing, 3); | |
return thing; | |
}); | |
EndBlock = new ActionBlock<Thing>(thing => { Debug.WriteLine("EndBlock"); }); | |
} | |
private void CreateCommonAsynchronousBlocks() | |
{ | |
Block1 = new TransformBlock<Thing, Thing>(async thing => | |
{ | |
Debug.WriteLine("Block1"); | |
await DoWorkOnThingAsync(thing, 1); | |
return thing; | |
}); | |
Block2 = new TransformBlock<Thing, Thing>(async thing => | |
{ | |
Debug.WriteLine("Block2"); | |
await DoWorkOnThingAsync(thing, 2); | |
return thing; | |
}); | |
Block3 = new TransformBlock<Thing, Thing>(async thing => | |
{ | |
Debug.WriteLine("Block3"); | |
await DoWorkOnThingAsync(thing, 3); | |
return thing; | |
}); | |
EndBlock = new ActionBlock<Thing>(thing => { Debug.WriteLine("EndBlock"); }); | |
} | |
private void DoGeneralStuffToThing(Thing thing) | |
{ | |
thing.Name = "I am thing"; | |
} | |
private async Task DoGeneralStuffToThingAsync(Thing thing) | |
{ | |
await Task.Run(() => | |
{ | |
Task.Delay(5000).Wait(); | |
DoGeneralStuffToThing(thing); | |
}); | |
} | |
private void DoWorkOnThing(Thing thing, int which) | |
{ | |
switch (which) | |
{ | |
case 1: | |
thing.ProcessedByBlock1 = DateTime.UtcNow; | |
break; | |
case 2: | |
thing.ProcessedByBlock2 = DateTime.UtcNow; | |
break; | |
case 3: | |
thing.ProcessedByBlock3 = DateTime.UtcNow; | |
break; | |
} | |
} | |
private async Task DoWorkOnThingAsync(Thing thing, int which) | |
{ | |
await Task.Run(() => | |
{ | |
Task.Delay(5000).Wait(); | |
DoWorkOnThing(thing, which); | |
}); | |
} | |
public class Thing | |
{ | |
public Status Status { get; set; } | |
public DateTime? ProcessedByBlock1 { get; set; } | |
public DateTime? ProcessedByBlock2 { get; set; } | |
public DateTime? ProcessedByBlock3 { get; set; } | |
public string Name { get; set; } | |
} | |
public enum Status | |
{ | |
Complete, | |
Cancelled, | |
Delayed | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[TestClass] | |
public class PipelineTests | |
{ | |
[TestMethod] | |
public async Task TestDataFlowBlockPipeline_SynchronousUsingPropagateCompletion() | |
{ | |
var pipe = new DataflowTestPipeline(); | |
pipe.CreatePipelineSynchronousWithLinkOptions(); //create the pipeline that uses DataflowLinkOptions to propagate completion | |
var completeThing = new DataflowTestPipeline.Thing | |
{ | |
Status = DataflowTestPipeline.Status.Complete | |
}; | |
pipe.MainBlock.Post(completeThing); | |
pipe.MainBlock.Complete(); | |
await pipe.EndBlock.Completion; | |
Debug.WriteLine("Doing Assert"); | |
Assert.IsNotNull(completeThing.ProcessedByBlock1); | |
} | |
[TestMethod] | |
public async Task TestDataFlowBlockPipeline_SynchronousUsingConditionalContinueWithInTheBlock() | |
{ | |
var pipe = new DataflowTestPipeline(); | |
pipe.CreatePipelineSynchronousConditionalContinueWith(); //create the pipeline that uses conditional logic to propagate completion | |
var completeThing = new DataflowTestPipeline.Thing | |
{ | |
Status = DataflowTestPipeline.Status.Complete | |
}; | |
pipe.MainBlock.Post(completeThing); | |
pipe.MainBlock.Complete(); | |
await pipe.EndBlock.Completion; | |
Debug.WriteLine("Doing Assert"); | |
Assert.IsNotNull(completeThing.ProcessedByBlock1); | |
} | |
[TestMethod] //this one always fails because blocks 2 and 3 "complete" and propagate to EndBlock before block1 completes | |
public async Task TestDataFlowBlockPipeline_AsynchronousUsingPropagateCompletion() | |
{ | |
var pipe = new DataflowTestPipeline(); | |
pipe.CreatePipelineAsynchronousWithLinkOptions(); //create the pipeline that uses DataflowLinkOptions to propagate completion | |
var completeThing = new DataflowTestPipeline.Thing | |
{ | |
Status = DataflowTestPipeline.Status.Complete | |
}; | |
pipe.MainBlock.Post(completeThing); | |
pipe.MainBlock.Complete(); | |
await pipe.EndBlock.Completion; | |
Debug.WriteLine("Doing Assert"); | |
Assert.IsNotNull(completeThing.ProcessedByBlock1); | |
} | |
[TestMethod] | |
public async Task TestDataFlowBlockPipeline_AsynchronousUsingConditionalContinueWithInTheBlock() | |
{ | |
var pipe = new DataflowTestPipeline(); | |
pipe.CreatePipelineAsynchronousConditionalContinueWith(); //create the pipeline that uses conditional logic to propagate completion | |
var completeThing = new DataflowTestPipeline.Thing | |
{ | |
Status = DataflowTestPipeline.Status.Complete | |
}; | |
pipe.MainBlock.Post(completeThing); | |
pipe.MainBlock.Complete(); | |
await pipe.EndBlock.Completion; | |
Debug.WriteLine("Doing Assert"); | |
Assert.IsNotNull(completeThing.ProcessedByBlock1); | |
} | |
[TestMethod] | |
public async Task TestDataFlowBlockPipeline_AsynchronousUsingTaskWhenAll() | |
{ | |
var pipe = new DataflowTestPipeline(); | |
pipe.CreatePipelineAsynchronousTaskWhenAll(); | |
var completeThing = new DataflowTestPipeline.Thing | |
{ | |
Status = DataflowTestPipeline.Status.Complete | |
}; | |
pipe.MainBlock.Post(completeThing); | |
pipe.MainBlock.Complete(); | |
await pipe.EndBlock.Completion; | |
Debug.WriteLine("Doing Assert"); | |
Assert.IsNotNull(completeThing.ProcessedByBlock1); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment