Skip to content

Instantly share code, notes, and snippets.

@pitermarx
Created November 3, 2023 09:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pitermarx/00602b2e7189e1410ee75c2d6471952f to your computer and use it in GitHub Desktop.
Save pitermarx/00602b2e7189e1410ee75c2d6471952f to your computer and use it in GitHub Desktop.
An example on how to use channels
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
class Processor<T> where T : class
{
private readonly Channel<T> _channel = Channel.CreateUnbounded<T>();
private readonly Task processors;
public Processor(Func<T, Task> action, int numberOfConsumers = 4)
=> this.processors = Task.WhenAll(Enumerable.Range(0, numberOfConsumers).Select(_ => CreateProcessor(action)));
public ValueTask Add(T job) => _channel.Writer.WriteAsync(job);
public Task Complete() {
_channel.Writer.Complete();
return processors;
}
private async Task CreateProcessor(Func<T, Task> action) {
while (await _channel.Reader.WaitToReadAsync()) {
var job = await _channel.Reader.ReadAsync();
try {
await action(job);
}
catch {
// handle exceptions
}
}
}
}
var p = new Processor<string>(async id => {
Console.WriteLine($"Started #{id}");
await Task.Delay(5000);
Console.WriteLine($"Finished #{id}");
});
while (Console.ReadLine() is string input && input != "x") await p.Add(input);
await p.Complete();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment