Skip to content

Instantly share code, notes, and snippets.

@garrydzeng
Created February 15, 2015 06:05
Show Gist options
  • Save garrydzeng/05c9d5bcbd382e90b852 to your computer and use it in GitHub Desktop.
Save garrydzeng/05c9d5bcbd382e90b852 to your computer and use it in GitHub Desktop.
// Stephen Toub
// stoub@microsoft.com
//
// ManagedThreadPool.cs
// ThreadPool written in 100% managed code. Mimics the core functionality of
// the System.Threading.ThreadPool class.
//
// HISTORY:
// v1.0.1 - Disposes of items remaining in queue when the queue is emptied
// - Catches errors thrown during execution of delegates
// - Added reset to semaphore, called during empty queue
// - Catches errors when unable to dequeue delegates
// v1.0.0 - Original version
//
// August 27, 2002
// v1.0.1
// http://www.gotdotnet.com/community/usersamples/Default.aspx?query=ManagedThreadPool
#region Namespaces
using System;
using System.Threading;
using System.Collections;
#endregion
namespace MySoft.Threading
{
/// <summary>Implementation of Dijkstra's PV Semaphore based on the Monitor class.</summary>
public class Semaphore
{
#region Member Variables
/// <summary>The number of units alloted by this semaphore.</summary>
private int _count;
#endregion
#region Construction
/// <summary> Initialize the semaphore as a binary semaphore.</summary>
public Semaphore()
: this(1)
{
}
/// <summary> Initialize the semaphore as a counting semaphore.</summary>
/// <param name="count">Initial number of threads that can take out units from this semaphore.</param>
/// <exception cref="ArgumentException">Throws if the count argument is less than 1.</exception>
public Semaphore(int count)
{
if (count < 0) throw new ArgumentException("Semaphore must have a count of at least 0.", "count");
_count = count;
}
#endregion
#region Synchronization Operations
/// <summary>V the semaphore (add 1 unit to it).</summary>
public void AddOne() { V(); }
/// <summary>P the semaphore (take out 1 unit from it).</summary>
public void WaitOne() { P(); }
/// <summary>P the semaphore (take out 1 unit from it).</summary>
public void P()
{
// Lock so we can work in peace. This works because lock is actually
// built around Monitor.
lock (this)
{
// Wait until a unit becomes available. We need to wait
// in a loop in case someone else wakes up before us. This could
// happen if the Monitor.Pulse statements were changed to Monitor.PulseAll
// statements in order to introduce some randomness into the order
// in which threads are woken.
while (_count <= 0) Monitor.Wait(this, Timeout.Infinite);
_count--;
}
}
/// <summary>V the semaphore (add 1 unit to it).</summary>
public void V()
{
// Lock so we can work in peace. This works because lock is actually
// built around Monitor.
lock (this)
{
// Release our hold on the unit of control. Then tell everyone
// waiting on this object that there is a unit available.
_count++;
Monitor.Pulse(this);
}
}
/// <summary>Resets the semaphore to the specified count. Should be used cautiously.</summary>
public void Reset(int count)
{
lock (this) { _count = count; }
}
#endregion
}
/// <summary>Managed thread pool.</summary>
public class ManagedThreadPool
{
#region Constants
/// <summary>Maximum number of threads the thread pool has at its disposal.</summary>
private const int _maxWorkerThreads = 100;
#endregion
#region Member Variables
/// <summary>Queue of all the callbacks waiting to be executed.</summary>
static Queue _waitingCallbacks;
/// <summary>
/// Used to signal that a worker thread is needed for processing. Note that multiple
/// threads may be needed simultaneously and as such we use a semaphore instead of
/// an auto reset event.
/// </summary>
static Semaphore _workerThreadNeeded;
/// <summary>List of all worker threads at the disposal of the thread pool.</summary>
static ArrayList _workerThreads;
/// <summary>Number of threads currently active.</summary>
static int _inUseThreads;
#endregion
#region Construction
/// <summary>Initialize the thread pool.</summary>
static ManagedThreadPool()
{
// Create our thread stores; we handle synchronization ourself
// as we may run into situtations where multiple operations need to be atomic.
// We keep track of the threads we've created just for good measure; not actually
// needed for any core functionality.
_waitingCallbacks = new Queue();
_workerThreads = new ArrayList();
_inUseThreads = 0;
// Create our "thread needed" event
_workerThreadNeeded = new Semaphore(0);
// Create all of the worker threads
for (int i = 0; i < _maxWorkerThreads; i++)
{
// Create a new thread and add it to the list of threads.
Thread newThread = new Thread(new ThreadStart(ProcessQueuedItems));
_workerThreads.Add(newThread);
// Configure the new thread and start it
newThread.Name = "ManagedPoolThread #" + i.ToString();
newThread.IsBackground = true;
newThread.Start();
}
}
#endregion
#region Public Methods
/// <summary>Queues a user work item to the thread pool.</summary>
/// <param name="callback">
/// A WaitCallback representing the delegate to invoke when the thread in the
/// thread pool picks up the work item.
/// </param>
public static void QueueUserWorkItem(WaitCallback callback)
{
// Queue the delegate with no state
QueueUserWorkItem(callback, null);
}
/// <summary>Queues a user work item to the thread pool.</summary>
/// <param name="callback">
/// A WaitCallback representing the delegate to invoke when the thread in the
/// thread pool picks up the work item.
/// </param>
/// <param name="state">
/// The object that is passed to the delegate when serviced from the thread pool.
/// </param>
public static void QueueUserWorkItem(WaitCallback callback, object state)
{
// Create a waiting callback that contains the delegate and its state.
// Add it to the processing queue, and signal that data is waiting.
WaitingCallback waiting = new WaitingCallback(callback, state);
lock (_waitingCallbacks.SyncRoot) { _waitingCallbacks.Enqueue(waiting); }
_workerThreadNeeded.AddOne();
}
/// <summary>Empties the work queue of any queued work items.</summary>
public static void EmptyQueue()
{
lock (_waitingCallbacks.SyncRoot)
{
try
{
// Try to dispose of all remaining state
foreach (object obj in _waitingCallbacks)
{
WaitingCallback callback = (WaitingCallback)obj;
if (callback.State is IDisposable) ((IDisposable)callback.State).Dispose();
}
}
catch
{
// Make sure an error isn't thrown.
}
// Clear all waiting items and reset the number of worker threads currently needed
// to be 0 (there is nothing for threads to do)
_waitingCallbacks.Clear();
_workerThreadNeeded.Reset(0);
}
}
#endregion
#region Properties
/// <summary>Gets the number of threads at the disposal of the thread pool.</summary>
public static int MaxThreads { get { return _maxWorkerThreads; } }
/// <summary>Gets the number of currently active threads in the thread pool.</summary>
public static int ActiveThreads { get { return _inUseThreads; } }
/// <summary>Gets the number of callback delegates currently waiting in the thread pool.</summary>
public static int WaitingCallbacks { get { lock (_waitingCallbacks.SyncRoot) { return _waitingCallbacks.Count; } } }
#endregion
#region Thread Processing
/// <summary>A thread worker function that processes items from the work queue.</summary>
private static void ProcessQueuedItems()
{
// Process indefinitely
while (true)
{
// Get the next item in the queue. If there is nothing there, go to sleep
// for a while until we're woken up when a callback is waiting.
WaitingCallback callback = null;
while (callback == null)
{
// Try to get the next callback available. We need to lock on the
// queue in order to make our count check and retrieval atomic.
lock (_waitingCallbacks.SyncRoot)
{
if (_waitingCallbacks.Count > 0)
{
try { callback = (WaitingCallback)_waitingCallbacks.Dequeue(); }
catch { } // make sure not to fail here
}
}
// If we can't get one, go to sleep.
if (callback == null) _workerThreadNeeded.WaitOne();
}
// We now have a callback. Execute it. Make sure to accurately
// record how many callbacks are currently executing.
try
{
Interlocked.Increment(ref _inUseThreads);
callback.Callback(callback.State);
}
catch
{
// Make sure we don't throw here. Errors are not our problem.
}
finally
{
Interlocked.Decrement(ref _inUseThreads);
}
}
}
#endregion
/// <summary>Used to hold a callback delegate and the state for that delegate.</summary>
private class WaitingCallback
{
#region Member Variables
/// <summary>Callback delegate for the callback.</summary>
private WaitCallback _callback;
/// <summary>State with which to call the callback delegate.</summary>
private object _state;
#endregion
#region Construction
/// <summary>Initialize the callback holding object.</summary>
/// <param name="callback">Callback delegate for the callback.</param>
/// <param name="state">State with which to call the callback delegate.</param>
public WaitingCallback(WaitCallback callback, object state)
{
_callback = callback;
_state = state;
}
#endregion
#region Properties
/// <summary>Gets the callback delegate for the callback.</summary>
public WaitCallback Callback { get { return _callback; } }
/// <summary>Gets the state with which to call the callback delegate.</summary>
public object State { get { return _state; } }
#endregion
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment