Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@markheath
Created March 9, 2023 10:58
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save markheath/220119511fc392b45078230018e57cc8 to your computer and use it in GitHub Desktop.
Save markheath/220119511fc392b45078230018e57cc8 to your computer and use it in GitHub Desktop.
LINQPad demo of the producer consumer pattern with TPL dataflow including back-pressure
<Query Kind="Statements">
<Namespace>System.Collections.Concurrent</Namespace>
<Namespace>System.Threading.Tasks</Namespace>
<Namespace>System.Threading.Tasks.Dataflow</Namespace>
</Query>
var produceSpeed = TimeSpan.FromSeconds(0.5);
var produceCount = 20;
var consumeSpeed = TimeSpan.FromSeconds(2);
var maxParallelConsume = 4;
async Task ProduceAsync(ITargetBlock<string> target)
{
for (int i = 0; i < produceCount; i++)
{
var item = $"Item {i + 1}";
Console.WriteLine($"Producing {item}");
await Task.Delay(produceSpeed);
Console.WriteLine($"Produced {item}");
await target.SendAsync(item); // this will block if we have too many
}
target.Complete();
}
async Task ConsumeOneAsync(string message)
{
Console.WriteLine($"Consuming {message}");
await Task.Delay(consumeSpeed);
Console.WriteLine($"Consumed {message}");
}
async Task<int> ConsumeAllAsync(ISourceBlock<string> source)
{
int itemsProcessed = 0;
while (await source.OutputAvailableAsync())
{
var data = await source.ReceiveAsync();
await ConsumeOneAsync(data);
itemsProcessed++;
}
return itemsProcessed;
}
var backPressure = true;
if (backPressure)
{
// producer won't let us have more than 4 things in the queue at once
var buffer = new BufferBlock<string>(new DataflowBlockOptions() { BoundedCapacity = maxParallelConsume });
var consumerBlock = new ActionBlock<string>(
message => ConsumeOneAsync(message),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
buffer.LinkTo(consumerBlock, new DataflowLinkOptions() { PropagateCompletion = true });
var producerTask = ProduceAsync(buffer);
await consumerBlock.Completion;
}
else
{
// SIMPLE IMPLEMENTATION:
var buffer = new BufferBlock<string>();
var consumerTask = ConsumeAllAsync(buffer);
var producerTask = ProduceAsync(buffer);
await Task.WhenAll(consumerTask, producerTask);
var itemsProcessed = consumerTask.Result;
itemsProcessed.Dump();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment