Skip to content

Instantly share code, notes, and snippets.

@guitarrapc
Last active March 8, 2021 18:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save guitarrapc/556f744f6824f6664a903e8d54532ec9 to your computer and use it in GitHub Desktop.
Save guitarrapc/556f744f6824f6664a903e8d54532ec9 to your computer and use it in GitHub Desktop.
Selected pool size execution pool.
async Task Main()
{
var sw = Stopwatch.StartNew();
// pool size = 50, runnning for 20sec
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(20));
using var pool = new TaskPool<int>(50, cts.Token);
while (!cts.Token.IsCancellationRequested)
{
pool.RegisterAsync(() => SendAsync(1)).FireAndForget();
}
sw.Elapsed.Dump();
Console.WriteLine($"Ops: {_count}");
}
private int _count = 0;
public async Task<int> SendAsync(int n)
{
await Task.Delay(TimeSpan.FromMilliseconds(n));
Interlocked.Increment(ref _count);
return 1;
}
public class TaskPool<T> : IDisposable
{
private readonly ConcurrentQueue<(TaskCompletionSource tcs, Task<T> task)> _pool = new ConcurrentQueue<(TaskCompletionSource, Task<T>)>();
private readonly int _poolSize;
private readonly AutoResetEvent _autoResetEvent;
private readonly CancellationToken _ct;
private object _lock = new object();
public TaskPool(int poolSize, CancellationToken ct)
{
_poolSize = poolSize;
_autoResetEvent = new AutoResetEvent(false);
_ct = ct;
RunEngine().FireAndForget();
}
public Task RegisterAsync(Func<Task<T>> item)
{
lock (_lock)
{
if (_pool.Count >= _poolSize)
_autoResetEvent.WaitOne();
var tcs = new TaskCompletionSource();
_pool.Enqueue((tcs, item.Invoke()));
return tcs.Task;
}
}
public void Dispose()
{
_autoResetEvent?.Dispose();
_pool.Clear();
}
private async Task RunEngine()
{
while (true)
{
if (_ct.IsCancellationRequested)
return;
if (!_pool.TryPeek(out var item))
{
await Task.Delay(1);
continue;
}
try
{
var result = await item.task.ConfigureAwait(false);
//Console.WriteLine($"complete queue {_pool.Count}");
item.tcs.TrySetResult();
}
catch (OperationCanceledException oex)
{
Console.WriteLine("canceled");
item.tcs.TrySetCanceled();
}
catch (Exception ex)
{
Console.WriteLine($"exception {ex.Message} {ex.GetType().FullName} {ex.StackTrace}");
item.tcs.TrySetException(ex);
}
finally
{
_pool.TryDequeue(out var _);
_autoResetEvent.Set();
}
}
}
}
public static class TaskExtensions
{
public static void FireAndForget(this Task task)
{
task.ContinueWith(x =>
{
Console.WriteLine("TaskUnhandled", x.Exception);
}, TaskContinuationOptions.OnlyOnFaulted);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment