Created
February 15, 2015 06:05
-
-
Save garrydzeng/05c9d5bcbd382e90b852 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Stephen Toub | |
// stoub@microsoft.com | |
// | |
// ManagedThreadPool.cs | |
// ThreadPool written in 100% managed code. Mimics the core functionality of | |
// the System.Threading.ThreadPool class. | |
// | |
// HISTORY: | |
// v1.0.1 - Disposes of items remaining in queue when the queue is emptied | |
// - Catches errors thrown during execution of delegates | |
// - Added reset to semaphore, called during empty queue | |
// - Catches errors when unable to dequeue delegates | |
// v1.0.0 - Original version | |
// | |
// August 27, 2002 | |
// v1.0.1 | |
// http://www.gotdotnet.com/community/usersamples/Default.aspx?query=ManagedThreadPool | |
#region Namespaces | |
using System; | |
using System.Threading; | |
using System.Collections; | |
#endregion | |
namespace MySoft.Threading | |
{ | |
/// <summary>Implementation of Dijkstra's PV Semaphore based on the Monitor class.</summary> | |
public class Semaphore | |
{ | |
#region Member Variables | |
/// <summary>The number of units alloted by this semaphore.</summary> | |
private int _count; | |
#endregion | |
#region Construction | |
/// <summary> Initialize the semaphore as a binary semaphore.</summary> | |
public Semaphore() | |
: this(1) | |
{ | |
} | |
/// <summary> Initialize the semaphore as a counting semaphore.</summary> | |
/// <param name="count">Initial number of threads that can take out units from this semaphore.</param> | |
/// <exception cref="ArgumentException">Throws if the count argument is less than 1.</exception> | |
public Semaphore(int count) | |
{ | |
if (count < 0) throw new ArgumentException("Semaphore must have a count of at least 0.", "count"); | |
_count = count; | |
} | |
#endregion | |
#region Synchronization Operations | |
/// <summary>V the semaphore (add 1 unit to it).</summary> | |
public void AddOne() { V(); } | |
/// <summary>P the semaphore (take out 1 unit from it).</summary> | |
public void WaitOne() { P(); } | |
/// <summary>P the semaphore (take out 1 unit from it).</summary> | |
public void P() | |
{ | |
// Lock so we can work in peace. This works because lock is actually | |
// built around Monitor. | |
lock (this) | |
{ | |
// Wait until a unit becomes available. We need to wait | |
// in a loop in case someone else wakes up before us. This could | |
// happen if the Monitor.Pulse statements were changed to Monitor.PulseAll | |
// statements in order to introduce some randomness into the order | |
// in which threads are woken. | |
while (_count <= 0) Monitor.Wait(this, Timeout.Infinite); | |
_count--; | |
} | |
} | |
/// <summary>V the semaphore (add 1 unit to it).</summary> | |
public void V() | |
{ | |
// Lock so we can work in peace. This works because lock is actually | |
// built around Monitor. | |
lock (this) | |
{ | |
// Release our hold on the unit of control. Then tell everyone | |
// waiting on this object that there is a unit available. | |
_count++; | |
Monitor.Pulse(this); | |
} | |
} | |
/// <summary>Resets the semaphore to the specified count. Should be used cautiously.</summary> | |
public void Reset(int count) | |
{ | |
lock (this) { _count = count; } | |
} | |
#endregion | |
} | |
/// <summary>Managed thread pool.</summary> | |
public class ManagedThreadPool | |
{ | |
#region Constants | |
/// <summary>Maximum number of threads the thread pool has at its disposal.</summary> | |
private const int _maxWorkerThreads = 100; | |
#endregion | |
#region Member Variables | |
/// <summary>Queue of all the callbacks waiting to be executed.</summary> | |
static Queue _waitingCallbacks; | |
/// <summary> | |
/// Used to signal that a worker thread is needed for processing. Note that multiple | |
/// threads may be needed simultaneously and as such we use a semaphore instead of | |
/// an auto reset event. | |
/// </summary> | |
static Semaphore _workerThreadNeeded; | |
/// <summary>List of all worker threads at the disposal of the thread pool.</summary> | |
static ArrayList _workerThreads; | |
/// <summary>Number of threads currently active.</summary> | |
static int _inUseThreads; | |
#endregion | |
#region Construction | |
/// <summary>Initialize the thread pool.</summary> | |
static ManagedThreadPool() | |
{ | |
// Create our thread stores; we handle synchronization ourself | |
// as we may run into situtations where multiple operations need to be atomic. | |
// We keep track of the threads we've created just for good measure; not actually | |
// needed for any core functionality. | |
_waitingCallbacks = new Queue(); | |
_workerThreads = new ArrayList(); | |
_inUseThreads = 0; | |
// Create our "thread needed" event | |
_workerThreadNeeded = new Semaphore(0); | |
// Create all of the worker threads | |
for (int i = 0; i < _maxWorkerThreads; i++) | |
{ | |
// Create a new thread and add it to the list of threads. | |
Thread newThread = new Thread(new ThreadStart(ProcessQueuedItems)); | |
_workerThreads.Add(newThread); | |
// Configure the new thread and start it | |
newThread.Name = "ManagedPoolThread #" + i.ToString(); | |
newThread.IsBackground = true; | |
newThread.Start(); | |
} | |
} | |
#endregion | |
#region Public Methods | |
/// <summary>Queues a user work item to the thread pool.</summary> | |
/// <param name="callback"> | |
/// A WaitCallback representing the delegate to invoke when the thread in the | |
/// thread pool picks up the work item. | |
/// </param> | |
public static void QueueUserWorkItem(WaitCallback callback) | |
{ | |
// Queue the delegate with no state | |
QueueUserWorkItem(callback, null); | |
} | |
/// <summary>Queues a user work item to the thread pool.</summary> | |
/// <param name="callback"> | |
/// A WaitCallback representing the delegate to invoke when the thread in the | |
/// thread pool picks up the work item. | |
/// </param> | |
/// <param name="state"> | |
/// The object that is passed to the delegate when serviced from the thread pool. | |
/// </param> | |
public static void QueueUserWorkItem(WaitCallback callback, object state) | |
{ | |
// Create a waiting callback that contains the delegate and its state. | |
// Add it to the processing queue, and signal that data is waiting. | |
WaitingCallback waiting = new WaitingCallback(callback, state); | |
lock (_waitingCallbacks.SyncRoot) { _waitingCallbacks.Enqueue(waiting); } | |
_workerThreadNeeded.AddOne(); | |
} | |
/// <summary>Empties the work queue of any queued work items.</summary> | |
public static void EmptyQueue() | |
{ | |
lock (_waitingCallbacks.SyncRoot) | |
{ | |
try | |
{ | |
// Try to dispose of all remaining state | |
foreach (object obj in _waitingCallbacks) | |
{ | |
WaitingCallback callback = (WaitingCallback)obj; | |
if (callback.State is IDisposable) ((IDisposable)callback.State).Dispose(); | |
} | |
} | |
catch | |
{ | |
// Make sure an error isn't thrown. | |
} | |
// Clear all waiting items and reset the number of worker threads currently needed | |
// to be 0 (there is nothing for threads to do) | |
_waitingCallbacks.Clear(); | |
_workerThreadNeeded.Reset(0); | |
} | |
} | |
#endregion | |
#region Properties | |
/// <summary>Gets the number of threads at the disposal of the thread pool.</summary> | |
public static int MaxThreads { get { return _maxWorkerThreads; } } | |
/// <summary>Gets the number of currently active threads in the thread pool.</summary> | |
public static int ActiveThreads { get { return _inUseThreads; } } | |
/// <summary>Gets the number of callback delegates currently waiting in the thread pool.</summary> | |
public static int WaitingCallbacks { get { lock (_waitingCallbacks.SyncRoot) { return _waitingCallbacks.Count; } } } | |
#endregion | |
#region Thread Processing | |
/// <summary>A thread worker function that processes items from the work queue.</summary> | |
private static void ProcessQueuedItems() | |
{ | |
// Process indefinitely | |
while (true) | |
{ | |
// Get the next item in the queue. If there is nothing there, go to sleep | |
// for a while until we're woken up when a callback is waiting. | |
WaitingCallback callback = null; | |
while (callback == null) | |
{ | |
// Try to get the next callback available. We need to lock on the | |
// queue in order to make our count check and retrieval atomic. | |
lock (_waitingCallbacks.SyncRoot) | |
{ | |
if (_waitingCallbacks.Count > 0) | |
{ | |
try { callback = (WaitingCallback)_waitingCallbacks.Dequeue(); } | |
catch { } // make sure not to fail here | |
} | |
} | |
// If we can't get one, go to sleep. | |
if (callback == null) _workerThreadNeeded.WaitOne(); | |
} | |
// We now have a callback. Execute it. Make sure to accurately | |
// record how many callbacks are currently executing. | |
try | |
{ | |
Interlocked.Increment(ref _inUseThreads); | |
callback.Callback(callback.State); | |
} | |
catch | |
{ | |
// Make sure we don't throw here. Errors are not our problem. | |
} | |
finally | |
{ | |
Interlocked.Decrement(ref _inUseThreads); | |
} | |
} | |
} | |
#endregion | |
/// <summary>Used to hold a callback delegate and the state for that delegate.</summary> | |
private class WaitingCallback | |
{ | |
#region Member Variables | |
/// <summary>Callback delegate for the callback.</summary> | |
private WaitCallback _callback; | |
/// <summary>State with which to call the callback delegate.</summary> | |
private object _state; | |
#endregion | |
#region Construction | |
/// <summary>Initialize the callback holding object.</summary> | |
/// <param name="callback">Callback delegate for the callback.</param> | |
/// <param name="state">State with which to call the callback delegate.</param> | |
public WaitingCallback(WaitCallback callback, object state) | |
{ | |
_callback = callback; | |
_state = state; | |
} | |
#endregion | |
#region Properties | |
/// <summary>Gets the callback delegate for the callback.</summary> | |
public WaitCallback Callback { get { return _callback; } } | |
/// <summary>Gets the state with which to call the callback delegate.</summary> | |
public object State { get { return _state; } } | |
#endregion | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment