Skip to content

Instantly share code, notes, and snippets.

@musukvl
Created June 17, 2020 09:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save musukvl/dcf1d08810317a0a6c28f55df9a79ccf to your computer and use it in GitHub Desktop.
Save musukvl/dcf1d08810317a0a6c28f55df9a79ccf to your computer and use it in GitHub Desktop.
Limited action pool async
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Common
{
public class LimitedLengthActionPool : IAsyncDisposable
{
private readonly SemaphoreSlim _semaphore;
private readonly ConcurrentDictionary<Guid, Task> _executingTasks = new ConcurrentDictionary<Guid, Task>();
public LimitedLengthActionPool(int parallelism = 0)
{
if (parallelism == 0)
{
parallelism = Environment.ProcessorCount - 1;
}
_semaphore = new SemaphoreSlim(parallelism);
}
public int Length
{
get { return _executingTasks.Count(x => !x.Value.IsCompleted); }
}
public async Task AddAction(Task task)
{
await _semaphore.WaitAsync();
RemoveCompletedTasks();
try
{
_executingTasks.TryAdd(Guid.NewGuid(), task);
}
finally
{
_semaphore.Release();
}
}
private void RemoveCompletedTasks()
{
var completed = _executingTasks.Where(x => x.Value.IsCompleted).Select(x => x.Key).ToArray();
if (completed.Any())
{
foreach (var item in completed)
{
_executingTasks.TryRemove(item, out var _);
}
}
}
public async ValueTask DisposeAsync()
{
await Task.WhenAll(_executingTasks.Values.ToArray());
_semaphore.Dispose();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment