Created
February 12, 2019 20:22
-
-
Save ignas-sakalauskas/6ceda6cec730e79d6f6bbdc99e3df5b9 to your computer and use it in GitHub Desktop.
TPL Dataflow Example in .NET Core 2.1
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.IO; | |
using System.Linq; | |
using System.Net; | |
using System.Threading.Tasks; | |
using System.Threading.Tasks.Dataflow; | |
namespace TplDataFlowPlayground | |
{ | |
internal class Program | |
{ | |
internal static async Task Main(string[] args) | |
{ | |
Console.WriteLine($"CPU count: {Environment.ProcessorCount}"); | |
var inputBuffer = new BufferBlock<int>(new DataflowBlockOptions | |
{ | |
BoundedCapacity = Environment.ProcessorCount | |
}); | |
async Task Produce(IEnumerable<int> values) | |
{ | |
// Send the message to buffer block asynchronously. | |
// The SendAsync method helps to throttle the messages sent | |
foreach (var value in values) | |
{ | |
await inputBuffer.SendAsync(value); | |
} | |
} | |
async Task MultipleProducers(params IEnumerable<int>[] producers) | |
{ | |
// Running multiple producers in parallel waiting | |
// all to terminate before notify the buffer block to complete | |
await Task.WhenAll(from values in producers select Produce(values)) | |
.ContinueWith(_ => inputBuffer.Complete()); | |
} | |
// Transfor block -- multiply each number by two | |
var transformationBlock = new TransformBlock<int, int>( | |
number => | |
{ | |
return number * 2; | |
}); | |
// Action, no output, therefore the last block in the pipeline | |
var actionBlock = new ActionBlock<int>(transformedNumber => | |
{ | |
Console.WriteLine($"Transformed result: {transformedNumber}"); | |
}); | |
// Link blocks together | |
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true }; | |
inputBuffer.LinkTo(transformationBlock, linkOptions); | |
transformationBlock.LinkTo(actionBlock, linkOptions); | |
// IMPORTANT -- all links must be defined before sending input data. | |
// Send input data | |
await MultipleProducers( | |
new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }, | |
new List<int> { 11, 22, 33, 44, 55, 66, 77, 88, 99, 1010 }, | |
new List<int> { 111, 222, 333, 444, 555, 666, 777, 888, 999, 101010 }); | |
// Wait for the last block in the pipeline to process all the messages | |
await actionBlock.Completion; | |
Console.WriteLine("Finished, exiting..."); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<Project Sdk="Microsoft.NET.Sdk"> | |
<PropertyGroup> | |
<OutputType>Exe</OutputType> | |
<LangVersion>latest</LangVersion> | |
<TargetFramework>netcoreapp2.1</TargetFramework> | |
</PropertyGroup> | |
<ItemGroup> | |
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.9.0"/> | |
</ItemGroup> | |
</Project> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment