Skip to content

Instantly share code, notes, and snippets.

@OmidID
Last active March 7, 2024 18:15
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save OmidID/da234a6cfbacebbd46defdb71c6cf95e to your computer and use it in GitHub Desktop.
Save OmidID/da234a6cfbacebbd46defdb71c6cf95e to your computer and use it in GitHub Desktop.
C# Task Pool
using System.Collections.Concurrent;
/// <summary>
/// TaskPool class that help you to execute async task in pool
/// </summary>
/// <remarks>
/// https://gist.github.com/OmidID/da234a6cfbacebbd46defdb71c6cf95e
/// </remarks>
public class TaskPool(int threadsMaxCount)
{
private readonly HashSet<IInternalTask> _workingTasks = [];
private readonly ConcurrentQueue<IInternalTask> _queue = new();
private readonly object _tasksMutex = new();
private readonly object _checkMutex = new();
public int ThreadsMaxCount => threadsMaxCount;
private interface IInternalTask
{
Task ExecuteAsync();
}
private sealed class InternalTaskHolder : IInternalTask
{
public required Func<Task> Task { get; init; }
public required TaskCompletionSource<object?>? Waiter { get; init; }
public async Task ExecuteAsync()
{
await Task();
Waiter?.SetResult(null);
}
}
private sealed class InternalTaskHolderGeneric<T> : IInternalTask
{
public required Func<Task<T>> Task { get; init; }
public required TaskCompletionSource<T> Waiter { get; init; }
public async Task ExecuteAsync()
{
var result = await Task();
Waiter.SetResult(result);
}
}
/// <summary>
/// Raised when all tasks have been completed.
/// </summary>
public event EventHandler? Completed;
public TaskPool(int threadsMaxCount, IEnumerable<Func<Task>> tasks)
: this(threadsMaxCount)
{
foreach (var task in tasks)
{
_queue.Enqueue(new InternalTaskHolder
{
Task = task,
Waiter = null
});
}
CheckQueue();
}
/// <summary>
/// Adds a task and runs it if free thread exists. Otherwise, enqueue.
/// </summary>
/// <param name="task">The task that will be executed.</param>
public Task EnqueueAsync(Func<Task> task)
{
lock (_tasksMutex)
{
var holder = new InternalTaskHolder
{
Task = task,
Waiter = new TaskCompletionSource<object?>()
};
if (_workingTasks.Count >= ThreadsMaxCount)
{
_queue.Enqueue(holder);
}
else
{
_ = StartTaskAsync(holder);
}
return holder.Waiter.Task;
}
}
/// <summary>
/// Adds a task and runs it if free thread exists. Otherwise, enqueue.
/// </summary>
/// <param name="task">The task that will be executed.</param>
public Task<T> EnqueueAsync<T>(Func<Task<T>> task)
{
lock (_tasksMutex)
{
var holder = new InternalTaskHolderGeneric<T>
{
Task = task,
Waiter = new()
};
if (_workingTasks.Count >= ThreadsMaxCount)
{
_queue.Enqueue(holder);
}
else
{
_ = StartTaskAsync(holder);
}
return holder.Waiter.Task;
}
}
/// <summary>
/// Starts the execution of a task.
/// </summary>
/// <param name="task">The task that should be executed.</param>
private async Task StartTaskAsync(IInternalTask task)
{
await task.ExecuteAsync();
TaskCompleted(task);
}
private void TaskCompleted(IInternalTask task)
{
lock (_tasksMutex)
{
_workingTasks.Remove(task);
CheckQueue();
if (_queue.IsEmpty && _workingTasks.Count == 0)
{
OnCompleted();
}
}
}
/// <summary>
/// Checks if the queue contains tasks and runs as many as there are free execution slots.
/// </summary>
private void CheckQueue()
{
lock (_checkMutex)
{
while (!_queue.IsEmpty && _workingTasks.Count < ThreadsMaxCount)
{
if (!_queue.TryDequeue(out var task))
{
continue;
}
_workingTasks.Add(task);
_ = StartTaskAsync(task);
}
}
}
/// <summary>
/// Raises the Completed event.
/// </summary>
protected virtual void OnCompleted()
{
Completed?.Invoke(this, EventArgs.Empty);
}
}
@yuzd
Copy link

yuzd commented Aug 5, 2020

Good

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment