Skip to content

Instantly share code, notes, and snippets.

@mbolt35
Last active July 15, 2019 04:41
Show Gist options
  • Save mbolt35/60b467f7698b74bd122baefd16ae1ca1 to your computer and use it in GitHub Desktop.
Save mbolt35/60b467f7698b74bd122baefd16ae1ca1 to your computer and use it in GitHub Desktop.
Thread Starvation Demonstration
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
namespace Concurrency.Examples
{
/// <summary>
/// This class contains a simple producer/consumer model that demonstrates thread starvation
/// (at least a scaled down version). You can execute the example by including this source file
/// and running:
///
/// <code>
/// Concurrency.Examples.ThreadStarvation.RunThreadStarvation();
/// Console.ReadLine();
/// </code>
/// </summary>
public class ThreadStarvation
{
/// <summary>
/// Runs the thread starvation test synchronously with 20 threads for 60 seconds.
/// </summary>
public static void RunThreadStarvation(int totalThreads = 20)
=> RunThreadStarvation(TimeSpan.FromSeconds(60), totalThreads);
/// <summary>
/// Runs the thread starvation test synchronously for a specific timespan and total thread count.
/// </summary>
public static void RunThreadStarvation(TimeSpan totalTime, int totalThreads = 20)
{
var threadStarver = new ThreadStarvation(totalThreads);
var hostThread = new Thread(() => threadStarver.Run(totalTime))
{
IsBackground = true
};
hostThread.Start();
hostThread.Join();
}
/// <summary>
/// Creates a new thread, starts the thread, and blocks until the thread has started running.
/// This ensures that we start all consumer threads before starting our producer.
/// </summary>
private static Thread StartThread(ThreadStart start)
{
var wait = new ManualResetEventSlim(false);
var t = new Thread(() =>
{
wait.Set();
start();
})
{ IsBackground = true };
t.Start();
wait.Wait();
return t;
}
/// <summary>
/// Mutex used to lock shared resource
/// </summary>
private readonly object mutex = new object();
/// <summary>
/// Consumer threads.
/// </summary>
private Thread[] _consumers;
/// <summary>
/// Producer thread.
/// </summary>
private Thread _producer;
/// <summary>
/// Whether the threads should continue running or not.
/// </summary>
private bool _running = true;
/// <summary>
/// The shared resource. We can assume that the resource is available when flag is 1
/// and 0 when not available.
/// </summary>
private int _flag = 1;
/// <summary>
/// Keeps a count for all consumption threads and the total number of times they were able
/// to consume the resource.
/// </summary>
private readonly ConsumeCounter _counter = new ConsumeCounter();
/// <summary>
/// Creates a new <see cref="ThreadStarvation"/> instance.
/// </summary>
private ThreadStarvation(int totalThreads)
{
_consumers = new Thread[totalThreads];
}
/// <summary>
/// This method consumes the flag value when it resets to 1 and sets it back to 0.
/// </summary>
private void Consume()
{
// Create a '0' entry for the current thread
_counter.Reset();
// Each thread will enter this loop, acquire the lock, test flag for 1, and then
// block via Wait(). This will return the lock (allowing other consumption threads
// to acquire, test, block as well.
while (_running)
{
// Thread acquires lock here
Monitor.Enter(mutex);
try
{
// This loop is important because we will use a PulseAll() that will unblock
// all blocking threads. Because we only have exactly 1 shared resource at a time,
// we could simply replace PulseAll() with Pulse() and get the same effect (Pulse
// would unblock one of the threads instead of all). The loop ensures that all unblocked
// threads re-test the flag condition before breaking out of the loop.
while (_flag < 1)
{
// Blocks and waits for a signal (Pulse). It's important to note that once a blocking
// thread is no longer blocking, it reacquires the lock it released when it started
// blocking. Above, when we say "releases all threads that are blocking", it means
// we release each blocking thread one at a time, each reacquiring the lock.
Monitor.Wait(mutex);
}
// The first thread to see that the flag is 1 will break out of the while loop and reacquire the lock.
// This prevents any of the other consume threads from getting to the resource. This is why our counter
// Increment() doesn't require any additional locking, and also why we don't have to use Interlocked
// for updating the flag.
_flag = 0;
_counter.Increment();
}
finally
{
Monitor.Exit(mutex);
}
}
}
/// <summary>
/// This method produces a flipped flag value. In other words, it will set the flag to
/// 1 on an interval.
/// </summary>
private void Produce()
{
var random = new Random();
// Continue to set the flag at random intervals until running flag is flipped
while (_running)
{
// Sleep a random number of milliseconds between 50 and 200.
Thread.Sleep(random.Next(50, 200));
// Acquire lock (same lock as consume threads - we use the same lock because we are sharing
// the same resource).
Monitor.Enter(mutex);
try
{
// Set our resource
_flag = 1;
// Pulse all the blocking consume threads to unblock and reacquire the lock. Since our resource
// is a single value, the same result could be accomplished with Pulse(), but we can further demonstrate
// starvation as a process if we unblock all threads (unblocks, test condition, block again). Since
// the order is non-deterministic, it prevents any thread fairness policy in the CLR from moving starved
// threads artificially to the front of the line.
Monitor.PulseAll(mutex);
}
finally
{
Monitor.Exit(mutex);
}
}
}
/// <summary>
/// Runs the starvation example.
/// </summary>
public void Run(TimeSpan runFor)
{
// Create the configured number of consumer threads.
for (var i = 0; i < _consumers.Length; ++i)
{
_consumers[i] = StartThread(Consume);
}
// Create a new producer thread
_producer = StartThread(Produce);
// Run the test for a specific time interval
Thread.Sleep(runFor);
// Flip the running flag and join the producer thread
// NOTE: Consume threads will still be blocking, but that's ok for our example.
_running = false;
_producer.Join();
// Print the counter results to console
_counter.DumpResults();
}
}
/// <summary>
/// Keeps track of the number of times a specific thread was able to reacquire a lock and consume
/// a resource.
/// </summary>
public class ConsumeCounter
{
/// <summary>
/// The current thread id.
/// </summary>
private static string Tid => Thread.CurrentThread.ManagedThreadId.ToString();
/// <summary>
/// Holds ours counters
/// </summary>
private readonly Dictionary<string, int> _counts = new Dictionary<string, int>();
/// <summary>
/// Creates a new <see cref="ConsumeCounter"/>
/// </summary>
public ConsumeCounter() { }
/// <summary>
/// Resets the counter for the current thread.
/// </summary>
public void Reset()
{
// Important to note that we lock here specifically because our starvation
// test code doesn't explicitly lock before it initializes the consumption code.
lock (_counts)
{
_counts[Tid] = 0;
}
}
/// <summary>
/// Increments the count for the current thread.
/// </summary>
public void Increment()
{
_counts[Tid]++;
}
/// <summary>
/// Dumps the counter results for each thread id.
/// </summary>
public void DumpResults()
{
// Sort results from greatest to least
var list = _counts.ToList();
list.Sort((x, y) => y.Value - x.Value);
foreach (var kv in list)
{
Console.WriteLine($"Thread: {kv.Key}, Count: {kv.Value}");
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment