Created
March 25, 2022 14:25
-
-
Save darraghjones/e9e5e7538546ecc0a9cb92232427805a to your computer and use it in GitHub Desktop.
How do I schedule work on the ThreadPool without causing thread starvation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Diagnostics; | |
const bool async = true; | |
const bool sync = true; | |
int concurrency = 0, total = 0, dequeued = 0; | |
var s = Stopwatch.StartNew(); | |
var cts = new CancellationTokenSource(); | |
cts.CancelAfter(TimeSpan.FromSeconds(30)); | |
var t = Task.Factory.StartNew(() => | |
{ | |
var process = Process.GetCurrentProcess(); | |
while (cts.IsCancellationRequested == false) | |
{ | |
Console.WriteLine($"{process.Threads.Count}\t{ThreadPool.ThreadCount}\t{ThreadPool.PendingWorkItemCount}\t{ThreadPool.CompletedWorkItemCount}\t{concurrency}\t{dequeued}"); | |
Thread.Sleep(1000); | |
} | |
}, TaskCreationOptions.LongRunning); | |
try | |
{ | |
//await TaskRun(); | |
//await TaskFactory(); | |
//await TaskFactoryAsync(); | |
//await TaskRunSlowly(); | |
//await ParallelForEach(); | |
//await ParallelForEachMax(int.MaxValue); | |
} | |
catch (TaskCanceledException) { } | |
Console.WriteLine($"Completed {total} work items in {s.ElapsedMilliseconds} ms"); | |
Task TaskFactory() | |
{ | |
foreach (var workItem in GetWorkItemsFromQueue()) | |
{ | |
Task.Factory.StartNew(() => workItem().GetAwaiter().GetResult(), CancellationToken.None, | |
TaskCreationOptions.LongRunning | TaskCreationOptions.PreferFairness | TaskCreationOptions.DenyChildAttach, TaskScheduler.Default); | |
} | |
return Task.CompletedTask; | |
} | |
Task TaskRun() | |
{ | |
foreach (var workItem in GetWorkItemsFromQueue()) | |
{ | |
Task.Run(workItem); | |
} | |
return Task.CompletedTask; | |
} | |
async Task TaskRunSlowly() | |
{ | |
foreach (var workItem in GetWorkItemsFromQueue()) | |
{ | |
while (ThreadPool.PendingWorkItemCount > 10) | |
{ | |
await Task.Delay(1); | |
} | |
Task.Run(workItem); | |
} | |
} | |
async Task ParallelForEach() | |
{ | |
await Parallel.ForEachAsync(GetWorkItemsFromQueue(), async (workItem, token) => | |
{ | |
await workItem(); | |
}); | |
} | |
async Task ParallelForEachMax(int maxConcurrency) | |
{ | |
var options = new ParallelOptions | |
{ | |
CancellationToken = cts.Token, | |
MaxDegreeOfParallelism = maxConcurrency | |
}; | |
await Parallel.ForEachAsync(GetWorkItemsFromQueue(), options, async (workItem, token) => | |
{ | |
await workItem(); | |
}); | |
} | |
async Task DoWork() // try and simulate a realistic message handler...is this realistic? i have no idea really :/ | |
{ | |
var timer = Stopwatch.StartNew(); | |
Interlocked.Increment(ref concurrency); | |
if (sync) while (timer.ElapsedMilliseconds < 100) ; // do some CPU intensive processing... | |
if (async) await Task.Delay(800); // call an API or whatever.... | |
if (sync) while (timer.ElapsedMilliseconds < 1000) ; // do more CPU intensive processing... | |
Interlocked.Decrement(ref concurrency); | |
Interlocked.Increment(ref total); | |
} | |
IEnumerable<Func<Task>> GetWorkItemsFromQueue() | |
{ | |
while (cts.IsCancellationRequested == false) | |
{ | |
Interlocked.Increment(ref dequeued); | |
yield return DoWork; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment