Skip to content

Instantly share code, notes, and snippets.

@ignas-sakalauskas
Created February 12, 2019 20:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ignas-sakalauskas/6ceda6cec730e79d6f6bbdc99e3df5b9 to your computer and use it in GitHub Desktop.
Save ignas-sakalauskas/6ceda6cec730e79d6f6bbdc99e3df5b9 to your computer and use it in GitHub Desktop.
TPL Dataflow Example in .NET Core 2.1
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace TplDataFlowPlayground
{
internal class Program
{
internal static async Task Main(string[] args)
{
Console.WriteLine($"CPU count: {Environment.ProcessorCount}");
var inputBuffer = new BufferBlock<int>(new DataflowBlockOptions
{
BoundedCapacity = Environment.ProcessorCount
});
async Task Produce(IEnumerable<int> values)
{
// Send the message to buffer block asynchronously.
// The SendAsync method helps to throttle the messages sent
foreach (var value in values)
{
await inputBuffer.SendAsync(value);
}
}
async Task MultipleProducers(params IEnumerable<int>[] producers)
{
// Running multiple producers in parallel waiting
// all to terminate before notify the buffer block to complete
await Task.WhenAll(from values in producers select Produce(values))
.ContinueWith(_ => inputBuffer.Complete());
}
// Transfor block -- multiply each number by two
var transformationBlock = new TransformBlock<int, int>(
number =>
{
return number * 2;
});
// Action, no output, therefore the last block in the pipeline
var actionBlock = new ActionBlock<int>(transformedNumber =>
{
Console.WriteLine($"Transformed result: {transformedNumber}");
});
// Link blocks together
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
inputBuffer.LinkTo(transformationBlock, linkOptions);
transformationBlock.LinkTo(actionBlock, linkOptions);
// IMPORTANT -- all links must be defined before sending input data.
// Send input data
await MultipleProducers(
new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 },
new List<int> { 11, 22, 33, 44, 55, 66, 77, 88, 99, 1010 },
new List<int> { 111, 222, 333, 444, 555, 666, 777, 888, 999, 101010 });
// Wait for the last block in the pipeline to process all the messages
await actionBlock.Completion;
Console.WriteLine("Finished, exiting...");
}
}
}
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<LangVersion>latest</LangVersion>
<TargetFramework>netcoreapp2.1</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.9.0"/>
</ItemGroup>
</Project>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment