Skip to content

Instantly share code, notes, and snippets.

@RupertAvery
Last active February 26, 2024 08:32
Show Gist options
  • Save RupertAvery/adff0e177fdbb096670a2022ec12d957 to your computer and use it in GitHub Desktop.
Save RupertAvery/adff0e177fdbb096670a2022ec12d957 to your computer and use it in GitHub Desktop.
A wrapper class using Channels to asynchonously process a queue that can be added to at any time
public class Job
{
private static int lastId;
public int Id { get; private set; }
public int Duration { get; private set; }
public Job(int value)
{
Id = lastId + 1;
Duration = value * 2000;
lastId = Id;
}
}
using System.Threading.Channels;
public class Processor<T> where T : class
{
private readonly Channel<T> _channel = Channel.CreateUnbounded<T>();
private int _degreeOfParallelism;
public Processor(int degreeOfParallelism)
{
_degreeOfParallelism = degreeOfParallelism;
}
public async Task Add(T job)
{
await _channel.Writer.WriteAsync(job);
}
public void Complete()
{
// marks the channel as complete
_channel.Writer.Complete();
}
public Task Start(Func<T, CancellationToken, Task> action, CancellationToken token)
{
var consumers = new List<Task>();
for (var i = 0; i < _degreeOfParallelism; i++)
{
consumers.Add(ProcessTask(token, action));
}
return Task.WhenAll(consumers);
}
private async Task ProcessTask(CancellationToken token, Func<T, CancellationToken, Task> action)
{
while (await _channel.Reader.WaitToReadAsync(token))
{
var job = await _channel.Reader.ReadAsync(token);
try {
await action(job, token);
}
catch {
// handle exceptions
}
}
}
}
var cts = new CancellationTokenSource();
var p = new Processor<Job>(4);
bool running = true;
Console.WriteLine("Enter 1-9 to add jobs of different durations (x 5 seconds), or X to quit");
// Start processing in a new thread, so we can interact with the user below
var task = Task.Run(async () =>{
var token = cts.Token;
Console.WriteLine("Started processing");
await p.Start(async (job, cancellationToken) => {
Console.WriteLine($"Started #{job.Id}");
await Task.Delay(job.Duration);
Console.WriteLine($"Finished #{job.Id}");
}, token);
Console.WriteLine("Stopped processing");
});
while (running)
{
var input = Console.ReadLine();
if (int.TryParse(input, out var number))
{
await p.Add(new Job(number));
}
if (input == "x")
{
p.Complete();
running = false;
}
}
Console.WriteLine("Waiting for tasks to complete");
// wait for all tasks to complete
await task;
Console.WriteLine("Done");
@RupertAvery
Copy link
Author

Sometimes you need to process stuff asynchronously, limited to X number of threads, and be able to add to a queue in an asynchronous manner also.

The sample program shows how to do this.

Once started, enter a couple of numbers, 1 = 2 secs, 2 = 4 secs, etc. You will see that it will process the first 4 items, then pick up succeeding items once one of the first four are completed.

When you exit by pressing x, we still need to wait for any pending tasks to complete. We await the Task we used to start the processing job, and call Complete to let the channel know it should stop looping when it runs out of jobs to process.

@CharlieDigital
Copy link

limited to X number of threads

This may be misleading here since it's using Task which does not correspond to a discrete thread. The degreeOfParallelism may or may not cause the same corresponding number of threads to be provisioned by the thread pool.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment