Last active
March 8, 2021 18:48
-
-
Save guitarrapc/556f744f6824f6664a903e8d54532ec9 to your computer and use it in GitHub Desktop.
Selected pool size execution pool.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
async Task Main() | |
{ | |
var sw = Stopwatch.StartNew(); | |
// pool size = 50, runnning for 20sec | |
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(20)); | |
using var pool = new TaskPool<int>(50, cts.Token); | |
while (!cts.Token.IsCancellationRequested) | |
{ | |
pool.RegisterAsync(() => SendAsync(1)).FireAndForget(); | |
} | |
sw.Elapsed.Dump(); | |
Console.WriteLine($"Ops: {_count}"); | |
} | |
private int _count = 0; | |
public async Task<int> SendAsync(int n) | |
{ | |
await Task.Delay(TimeSpan.FromMilliseconds(n)); | |
Interlocked.Increment(ref _count); | |
return 1; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class TaskPool<T> : IDisposable | |
{ | |
private readonly ConcurrentQueue<(TaskCompletionSource tcs, Task<T> task)> _pool = new ConcurrentQueue<(TaskCompletionSource, Task<T>)>(); | |
private readonly int _poolSize; | |
private readonly AutoResetEvent _autoResetEvent; | |
private readonly CancellationToken _ct; | |
private object _lock = new object(); | |
public TaskPool(int poolSize, CancellationToken ct) | |
{ | |
_poolSize = poolSize; | |
_autoResetEvent = new AutoResetEvent(false); | |
_ct = ct; | |
RunEngine().FireAndForget(); | |
} | |
public Task RegisterAsync(Func<Task<T>> item) | |
{ | |
lock (_lock) | |
{ | |
if (_pool.Count >= _poolSize) | |
_autoResetEvent.WaitOne(); | |
var tcs = new TaskCompletionSource(); | |
_pool.Enqueue((tcs, item.Invoke())); | |
return tcs.Task; | |
} | |
} | |
public void Dispose() | |
{ | |
_autoResetEvent?.Dispose(); | |
_pool.Clear(); | |
} | |
private async Task RunEngine() | |
{ | |
while (true) | |
{ | |
if (_ct.IsCancellationRequested) | |
return; | |
if (!_pool.TryPeek(out var item)) | |
{ | |
await Task.Delay(1); | |
continue; | |
} | |
try | |
{ | |
var result = await item.task.ConfigureAwait(false); | |
//Console.WriteLine($"complete queue {_pool.Count}"); | |
item.tcs.TrySetResult(); | |
} | |
catch (OperationCanceledException oex) | |
{ | |
Console.WriteLine("canceled"); | |
item.tcs.TrySetCanceled(); | |
} | |
catch (Exception ex) | |
{ | |
Console.WriteLine($"exception {ex.Message} {ex.GetType().FullName} {ex.StackTrace}"); | |
item.tcs.TrySetException(ex); | |
} | |
finally | |
{ | |
_pool.TryDequeue(out var _); | |
_autoResetEvent.Set(); | |
} | |
} | |
} | |
} | |
public static class TaskExtensions | |
{ | |
public static void FireAndForget(this Task task) | |
{ | |
task.ContinueWith(x => | |
{ | |
Console.WriteLine("TaskUnhandled", x.Exception); | |
}, TaskContinuationOptions.OnlyOnFaulted); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment