Skip to content

Instantly share code, notes, and snippets.

@darraghjones
Created March 25, 2022 14:25
Show Gist options
  • Save darraghjones/e9e5e7538546ecc0a9cb92232427805a to your computer and use it in GitHub Desktop.
Save darraghjones/e9e5e7538546ecc0a9cb92232427805a to your computer and use it in GitHub Desktop.
How do I schedule work on the ThreadPool without causing thread starvation
using System.Diagnostics;
const bool async = true;
const bool sync = true;
int concurrency = 0, total = 0, dequeued = 0;
var s = Stopwatch.StartNew();
var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(30));
var t = Task.Factory.StartNew(() =>
{
var process = Process.GetCurrentProcess();
while (cts.IsCancellationRequested == false)
{
Console.WriteLine($"{process.Threads.Count}\t{ThreadPool.ThreadCount}\t{ThreadPool.PendingWorkItemCount}\t{ThreadPool.CompletedWorkItemCount}\t{concurrency}\t{dequeued}");
Thread.Sleep(1000);
}
}, TaskCreationOptions.LongRunning);
try
{
//await TaskRun();
//await TaskFactory();
//await TaskFactoryAsync();
//await TaskRunSlowly();
//await ParallelForEach();
//await ParallelForEachMax(int.MaxValue);
}
catch (TaskCanceledException) { }
Console.WriteLine($"Completed {total} work items in {s.ElapsedMilliseconds} ms");
Task TaskFactory()
{
foreach (var workItem in GetWorkItemsFromQueue())
{
Task.Factory.StartNew(() => workItem().GetAwaiter().GetResult(), CancellationToken.None,
TaskCreationOptions.LongRunning | TaskCreationOptions.PreferFairness | TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
}
return Task.CompletedTask;
}
Task TaskRun()
{
foreach (var workItem in GetWorkItemsFromQueue())
{
Task.Run(workItem);
}
return Task.CompletedTask;
}
async Task TaskRunSlowly()
{
foreach (var workItem in GetWorkItemsFromQueue())
{
while (ThreadPool.PendingWorkItemCount > 10)
{
await Task.Delay(1);
}
Task.Run(workItem);
}
}
async Task ParallelForEach()
{
await Parallel.ForEachAsync(GetWorkItemsFromQueue(), async (workItem, token) =>
{
await workItem();
});
}
async Task ParallelForEachMax(int maxConcurrency)
{
var options = new ParallelOptions
{
CancellationToken = cts.Token,
MaxDegreeOfParallelism = maxConcurrency
};
await Parallel.ForEachAsync(GetWorkItemsFromQueue(), options, async (workItem, token) =>
{
await workItem();
});
}
async Task DoWork() // try and simulate a realistic message handler...is this realistic? i have no idea really :/
{
var timer = Stopwatch.StartNew();
Interlocked.Increment(ref concurrency);
if (sync) while (timer.ElapsedMilliseconds < 100) ; // do some CPU intensive processing...
if (async) await Task.Delay(800); // call an API or whatever....
if (sync) while (timer.ElapsedMilliseconds < 1000) ; // do more CPU intensive processing...
Interlocked.Decrement(ref concurrency);
Interlocked.Increment(ref total);
}
IEnumerable<Func<Task>> GetWorkItemsFromQueue()
{
while (cts.IsCancellationRequested == false)
{
Interlocked.Increment(ref dequeued);
yield return DoWork;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment