Skip to content

Instantly share code, notes, and snippets.

@dust63
Last active November 2, 2023 14:41
Show Gist options
  • Save dust63/c8b1931bb8068f30b8fcf2a73955a57e to your computer and use it in GitHub Desktop.
Save dust63/c8b1931bb8068f30b8fcf2a73955a57e to your computer and use it in GitHub Desktop.
Async parallel gated
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace System.Threading.Tasks.Helpers
{
/// <summary>
/// Allow to manage parallelism task with a max degree of parallelism
/// </summary>
public class AsyncParallelWrapper
{
private readonly SemaphoreSlim _semaphore;
private readonly CancellationToken _cancellationToken;
/// <summary>
/// Initialize async wrapper
/// </summary>
/// <param name="maxDegreeOfParallelism">Number of task allowed to run in parallel</param>
/// <param name="cancellationToken">
/// The cancellation token to used to cancell semaphore waiting
/// </param>
public AsyncParallelWrapper(int maxDegreeOfParallelism, CancellationToken cancellationToken = default)
{
_semaphore = new SemaphoreSlim(maxDegreeOfParallelism);
_cancellationToken = cancellationToken;
}
/// <summary>
/// Run tasks in parallel
/// </summary>
/// <param name="tasks">Tasks to run in parallel</param>
/// <returns></returns>
public Task RunInParallel(IEnumerable<Task> tasks)
{
return Task.WhenAll(tasks.Select(x => InternalRun(x)));
}
/// <summary>
/// Run tasks in parallel
/// </summary>
/// <param name="tasks">Tasks to run in parallel</param>
/// <returns></returns>
public Task<T[]> RunInParallel<T>(IEnumerable<Task<T>> tasks)
{
return Task.WhenAll(tasks.Select(x => InternalRun(x)));
}
/// <summary>
/// Run task with semaphore
/// </summary>
/// <param name="task">Task to await wiht semaphore</param>
/// <returns>An asynchronous task</returns>
private async Task InternalRun(Task task)
{
await _semaphore.WaitAsync(_cancellationToken);
try
{
await task;
}
finally
{
_semaphore.Release();
}
}
/// <summary>
/// Run task with semaphore
/// </summary>
/// <typeparam name="T">Return type</typeparam>
/// <param name="task">Task to await wiht semaphore</param>
/// <returns>An asynchronous task o <typeparamref name="T"/></returns>
private async Task<T> InternalRun<T>(Task<T> task)
{
await _semaphore.WaitAsync(_cancellationToken);
try
{
return await task;
}
finally
{
_semaphore.Release();
}
}
/// <summary>
/// Run task in parallel with a max degree of parallelism
/// </summary>
/// <param name="tasks">Tasks to run in parallel</param>
/// <param name="maxDegreeOfParallelism">Number of task allowed to run in parallel</param>
/// <param name="cancellationToken">Then cancellation token to cancel semaphaore waiting</param>
/// <returns></returns>
public static Task RunInParallel(IEnumerable<Task> tasks, int maxDegreeOfParallelism, CancellationToken cancellationToken) => new AsyncParallelWrapper(maxDegreeOfParallelism, cancellationToken).RunInParallel(tasks);
/// <summary>
/// Run task in parallel with a max degree of parallelism
/// </summary>
/// <typeparam name="T">Return type</typeparam>
/// <param name="tasks">Tasks to run in parallel</param>
/// <param name="maxDegreeOfParallelism">Number of task allowed to run in parallel</param>
/// <param name="cancellationToken">Then cancellation token to cancel semaphaore waiting</param>
/// <returns></returns>
public static Task<T[]> RunInParallel<T>(IEnumerable<Task<T>> tasks, int maxDegreeOfParallelism, CancellationToken cancellationToken) => new AsyncParallelWrapper(maxDegreeOfParallelism, cancellationToken).RunInParallel(tasks);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment