Skip to content

Instantly share code, notes, and snippets.

@badamczewski
Created May 7, 2012 23:02
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save badamczewski/7836efa2f2fe4bd210ba to your computer and use it in GitHub Desktop.
Task Programming model, build over a SmartThreadPool with SmartThreads that incorporate scheduling and work stealing.
#region Licence
// Copyright (c) 2012 BAX Services Bartosz Adamczewski
//
// Permission is hereby granted, free of charge, to any person
// obtaining a copy of this software and associated documentation
// files (the "Software"), to deal in the Software without
// restriction, including without limitation the rights to use,
// copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice shall be
// included in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
// OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
// WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
// OTHER DEALINGS IN THE SOFTWARE.
#endregion
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Runtime.InteropServices;
using System.Diagnostics;
using System.Reflection;
namespace SmartTasks
{
public delegate void Action();
/// <summary>
/// Heap implementation with very small subset of functionality.
/// </summary>
/// <remarks>
/// This heap structure is missing basic functionality as it was designed for
/// a very specific purpose and therefore it should not be considered usable
/// in any other case then SmartThreadPool.
/// </remarks>
/// <typeparam name="T">Comparible generic type.</typeparam>
public class Heap<T> where T : IComparable<T>
{
internal T[] items;
private T max;
private int size;
public bool IsBuilding
{
get;
private set;
}
/// <summary>
/// Initializes the heap that takes the input array, to construct the heap.
/// </summary>
/// <param name="array">Generic array.</param>
public Heap( T[] array )
{
this.items = array;
if ( array.Length != 0 )
max = this.items[ 0 ];
size = array.Length;
BuildHeap();
}
/// <summary>
/// Deletes the minimum element from the heap.
/// </summary>
/// <returns></returns>
public T DeleteMin()
{
//get min and ovveride it, with the next array element.
T min = items[ 0 ];
items[ 0 ] = items[ --size ];
//build the heap down.
BuildDown( 0 );
return min;
}
/// <summary>
/// Reads the minimum element from the heap.
/// </summary>
/// <returns>the minimum element.</returns>
public T ReadMin()
{
return items[ 0 ];
}
/// <summary>
/// Reads the maximum element from the heap.
/// </summary>
/// <returns>the maximum element.</returns>
public T ReadMax()
{
return max;
}
/// <summary>
/// Set's a new heap value.
/// </summary>
/// <param name="newSize">new size.</param>
public void SetSize( int newSize )
{
size = newSize;
}
/// <summary>
/// Get's the heap size.
/// </summary>
/// <returns>size.</returns>
public int GetSize()
{
return size;
}
/// <summary>
/// Inserts a new item into the heap.
/// </summary>
/// <param name="item">item to be added.</param>
public void Insert( T item )
{
if ( size == items.Length )
Expand();
int k = ++size - 1;
items[ k ] = item;
if ( max == null )
max = item;
BuildUp( k );
}
/// <summary>
/// Updates the specified value at k-th element.
/// </summary>
/// <param name="k">the element index to update.</param>
/// <param name="item">the specified item value.</param>
public void Update( int k, T item )
{
items[ k ] = item;
// start as a parent as check for build up is faster.
int childOrParrent = ( k - 1 ) / 2;
// first check if we should build up or down.
if ( CheckAndBuildUp( k, childOrParrent, item ) )
return;
childOrParrent = k * 2 + 1;
// check the left child.
if ( CheckAndBuildDown( k, childOrParrent, item ) )
return;
// check the right child.
childOrParrent++;
if ( CheckAndBuildDown( k, childOrParrent, item ) )
return;
}
/// <summary>
/// Builds the heap.
/// </summary>
public void BuildHeap()
{
IsBuilding = true;
for ( int k = size / 2; k >= 0; k-- )
BuildDown( k );
IsBuilding = false;
}
/// <summary>
/// Builds the heap down.
/// </summary>
/// <param name="k">the k-th element to start from.</param>
public void BuildDown( int k )
{
int child = k;
for ( ; 2 * child + 1 < size; k = child )
{
// local subtree root
T root = items[ k ];
// left child.
child = 2 * k + 1;
// if left child is bigger then the right then pick the right.
if ( child != size - 1 && items[ child ].CompareTo( items[ child + 1 ] ) > 0 )
child++;
// now compare with root
if ( root.CompareTo( items[ child ] ) > 0 )
{
if ( max.CompareTo( items[ child ] ) < 0 )
max = items[ child ];
// swamp
items[ k ] = items[ child ];
items[ child ] = root;
}
}
}
/// <summary>
/// Checks if k-th element is smaller then it's parent and builds up
/// if that yeilds true.
/// </summary>
/// <param name="k">k-th element.</param>
/// <param name="parent">the parent.</param>
/// <param name="item">the value of k-th element.</param>
/// <returns>boolean value indicating that the heap was build up.</returns>
private bool CheckAndBuildUp( int k, int parent, T item )
{
if ( parent > 0 && item.CompareTo( items[ parent ] ) < 0 )
{
items[ k ] = items[ parent ];
items[ parent ] = item;
BuildUp( parent );
return true;
}
return false;
}
/// <summary>
/// Checks if k-th element is bigger then it's parent and builds down
/// if that yeilds true.
/// </summary>
/// <param name="k">k-th element.</param>
/// <param name="parent">the parent.</param>
/// <param name="item">the value of k-th element.</param>
/// <returns>boolean value indicating that the heap was build down.</returns>
private bool CheckAndBuildDown( int k, int child, T item )
{
if ( child != items.Length && item.CompareTo( items[ child ] ) > 0 )
{
// swamp
items[ k ] = items[ child ];
items[ child ] = item;
BuildDown( child );
return true;
}
return false;
}
/// <summary>
/// Builds the heap up.
/// </summary>
/// <param name="k">the k-th element to start from.</param>
private void BuildUp( int k )
{
int parent = k;
for ( ; k > 1; k = parent )
{
// local subtree root
T root = items[ k ];
parent = ( k - 1 ) / 2;
// check my parent, if im smaller then him then swamp
if ( root.CompareTo( items[ parent ] ) > 0 )
{
if ( max.CompareTo( items[ parent ] ) <= 0 )
max = items[ parent ];
items[ k ] = items[ parent ];
items[ parent ] = root;
}
}
}
/// <summary>
/// Expands the heap array.
/// </summary>
private void Expand()
{
T[] oldItems = items;
int len = items.Length;
if ( len == 0 )
len = 1;
else
len *= 2;
items = new T[ len ];
if ( oldItems.Length != 0 )
Array.Copy( oldItems, items, size );
}
}
/// <summary>
/// Thread pool that incorprates fair thread scheduling as well as work stealing.
/// </summary>
public static class SmartThreadPool
{
/// <summary>
/// Gets/Sets the threadpool debug mode.
/// </summary>
/// <remarks>
/// When enabled upon scheduling a new work item, the thread pool
/// will log certain actions.
/// </remarks>
public static bool DebugModeOn
{
get;
set;
}
/// <summary>
/// Best case count thread count (best case means the lowest count).
/// </summary>
private static int bestThCnt;
/// <summary>
/// Locker object for any threads that want to put stuff in.
/// </summary>
private static readonly object locker = new object();
/// <summary>
/// Locker for artificial scheduler.
/// </summary>
private static readonly object artificialLocker = new object();
/// <summary>
/// Indicates that the artificial lock was taken by a thread
/// </summary>
private static bool artificialLockerTaken;
/// <summary>
/// Heap that represents a priority queue of worker threads.
/// </summary>
private static readonly Heap<SmartThread> threadScheduler = null;
/// <summary>
/// Heap that represents a priority queue of worker threads that werent created in the pool.
/// </summary>
private static readonly Heap<SmartThread> artificialThreadScheduler = null;
/// <summary>
/// A default static constructor that initializes the pool threads.
/// </summary>
static SmartThreadPool()
{
//create an empty heap.
artificialThreadScheduler = new Heap<SmartThread>( new SmartThread[] { } );
//the idea here is {core_count} * 2 the rest should be spawned as fibers.
//(fiber fuctionality comming soon :D, keep your hopes up kids!)
SmartThread[] threads = new SmartThread[ Environment.ProcessorCount * 2 ];
for ( int i = 0; i < threads.Length; i++ )
threads[ i ] = new SmartThread( true );
threadScheduler = new Heap<SmartThread>( threads );
}
/// <summary>
/// Queues a new Action on the thread pool.
/// </summary>
/// <param name="action">Action delegate.</param>
public static void QueueWorkItem( Task action )
{
SmartThread lowestWorkloadThread = null;
lock ( locker )
{
/*
* Accesing the root and it's direct children gives us better performance then just
* accesing the root. This is due that we don't lock the items on heap operations
* so while we reorganize the queue we might end up in a worse thread, but statistically
* when this happens better load statistics are in roots children.
*/
lowestWorkloadThread = threadScheduler.items[ bestThCnt++ % 3 ];
}
//If a thread is not started then do Start it.
if ( lowestWorkloadThread.IsStarted == false )
{
lowestWorkloadThread.Start();
}
//schedule a task.
lowestWorkloadThread.Execute( action );
// for debug only.
if ( DebugModeOn == true )
{
foreach ( SmartThread thread in threadScheduler.items )
{
Console.Write( string.Format( "t{0:000} a:{1:000} c:{2:000}; ",
thread.ThreadId, thread.AvgTaskTime, thread.scheduler.Count ) );
}
Console.WriteLine();
}
}
/// <summary>
/// Tries to steal work from the most loaded thread in the pool.
/// </summary>
/// <returns>SmartThread</returns>
/// <param name="threadPoolThread">boolean value idnicating that we want threads that
/// were created in a threadpool.</param>
/// <returns>SmartThread that has the most work to do.</returns>
internal static SmartThread GetThreadToSteal( bool threadPoolThread )
{
//get the element that has most of the load.
if ( threadPoolThread )
return threadScheduler.ReadMax();
return artificialThreadScheduler.ReadMax();
}
/// <summary>
/// Atempts to rebuild the pririty queue, to contain correct information
/// about priorities.
/// </summary>
/// <param name="threadPoolThread">boolean value idnicating that we want threads that
/// were created in a threadpool.</param>
internal static void Reschedule( bool threadPoolThread )
{
if ( threadPoolThread )
{
//we don't need to lock this section as generally we are ok
//with the race as it will not cause any errors but putting a simple
//flag arround it should be enough to prevent most races.
if ( threadScheduler.IsBuilding == false )
threadScheduler.BuildHeap();
return;
}
//if the lock was not taken then we proceed to lock it
//we use this as we don't want to block threads that build heap
//just the threads that insert and remove values from the heap.
if ( artificialLockerTaken == false )
artificialLockerTaken = Monitor.TryEnter( artificialLocker );
else
return;
if ( artificialThreadScheduler.IsBuilding == false )
artificialThreadScheduler.BuildHeap();
//we taken the lock so we need to exit.
if ( artificialLockerTaken == true )
Monitor.Exit( artificialLocker );
}
/// <summary>
/// Inserts a thread that not orginated in a thread pool to the artificial scheduler,
/// in order to enable fair work scheduling and work stealing.
/// </summary>
/// <param name="thread">SmartThread.</param>
internal static void InsertToArtificialScheduler( SmartThread thread )
{
lock ( artificialLocker )
{
artificialThreadScheduler.Insert( thread );
}
}
/// <summary>
/// Removes the given thread that not orginated in a thread pool from the artificial scheduler.
/// </summary>
/// <param name="thread">SmartThread</param>
internal static void RemoveFromArtificialScheduler( SmartThread thread )
{
lock ( artificialLocker )
{
int cnt = 0;
int size = artificialThreadScheduler.GetSize();
SmartThread[] artificialThreadItems = new SmartThread[ --size ];
//the linear search in this context should not be to big of a problem,
//as if we join then this means that probably our queue is empty so we should
//be somwhere in the top of the tree.
foreach ( SmartThread artificialThread in artificialThreadScheduler.items )
{
if ( artificialThread.ThreadId != thread.ThreadId )
{
artificialThreadItems[ cnt ] = artificialThread;
cnt++;
}
}
artificialThreadScheduler.SetSize( size );
artificialThreadScheduler.items = artificialThreadItems;
}
}
}
public enum TaskStatus
{
Error,
Running,
NotStarted,
Done
}
[global::System.Serializable]
public class TaskException : Exception
{
//
// For guidelines regarding the creation of new exception types, see
// http://msdn.microsoft.com/library/default.asp?url=/library/en-us/cpgenref/html/cpconerrorraisinghandlingguidelines.asp
// and
// http://msdn.microsoft.com/library/default.asp?url=/library/en-us/dncscol/html/csharp07192001.asp
//
public TaskException() { }
public TaskException( string message ) : base( message ) { }
public TaskException( string message, Exception inner ) : base( message, inner ) { }
protected TaskException(
System.Runtime.Serialization.SerializationInfo info,
System.Runtime.Serialization.StreamingContext context ) : base( info, context ) { }
}
/// <summary>
/// Represents a Task.
/// </summary>
public class Task
{
private ManualResetEvent wait = new ManualResetEvent( false );
private Action action;
internal TaskStatus status;
private TaskException ex;
/// <summary>
/// Initializes a task by passign the Action to be executed by it.
/// </summary>
/// <param name="action">action to be executed.</param>
public Task( Action action )
{
this.action = action;
status = TaskStatus.NotStarted;
}
/// <summary>
/// Blocks the current thread until the tasks work is done.
/// </summary>
public void Wait()
{
wait.WaitOne();
if ( status == TaskStatus.Error )
throw ex;
}
/// <summary>
/// Runs the task synchronously.
/// </summary>
public void RunSynchronously()
{
RunSynchronously( new Task[] { } );
}
internal TaskStatus InternalRun()
{
try
{
status = TaskStatus.Running;
action();
wait.Set();
status = TaskStatus.Done;
}
catch ( Exception ex )
{
//we need to capture the exception from other threads as we don't want to crash the
//pools threads.
status = TaskStatus.Error;
this.ex = new TaskException( "InternalRun", ex );
}
return status;
}
/// <summary>
/// Runs the task asynchronously.
/// </summary>
public void RunAsynchronously()
{
RunAsynchronously( new Task[] { } );
}
/// <summary>
/// Runs the task synchronously.
/// </summary>
/// <param name="tasksToWaitFor">the list of tasks to wait for, before running.</param>
public void RunSynchronously( params Task[] tasksToWaitFor )
{
WaitForCompletion( tasksToWaitFor );
status = TaskStatus.Running;
if ( InternalRun() == TaskStatus.Error )
throw ex;
}
/// <summary>
/// Runs the task asynchronously.
/// </summary>
/// <param name="tasksToWaitFor">the list of tasks to wait for, before running.</param>
public void RunAsynchronously( params Task[] tasksToWaitFor )
{
WaitForCompletion( tasksToWaitFor );
status = TaskStatus.Running;
SmartThreadPool.QueueWorkItem( this );
}
/// <summary>
/// Creates a task.
/// </summary>
/// <param name="action">The action to be eecuted by a task.</param>
/// <returns>Task.</returns>
public static Task Create( Action action )
{
return new Task( action );
}
private void WaitForCompletion( Task[] tasks )
{
foreach ( Task task in tasks )
task.Wait();
}
}
/// <summary>
/// A smart thread that incorporates work stealing tehniques.
/// </summary>
public class SmartThread : IComparable<SmartThread>
{
/// <summary>
/// Native API import function, that is used to Yeild a thread on
/// the same processor.
/// </summary>
/// <returns>the value indicating that the operation was successful.</returns>
[DllImport( "kernel32.dll" )]
static extern bool SwitchToThread();
private const int waitSpinLmit = 5;
private const int waitSpinTime = 30;
private readonly Thread thread;
internal readonly Queue<Task> scheduler;
private readonly ManualResetEvent wait;
internal readonly object locker = new object();
private bool isSignalled;
private bool isPendingJoin;
private int id;
private int executionCount;
private long totalTime;
private bool isInitializedInPool;
private int waitSpinCount = 0;
/// <summary>
/// Gets the ManagedThread Id.
/// </summary>
public int ThreadId
{
get { return id; }
}
/// <summary>
/// Gets the Average task running time.
/// </summary>
public decimal AvgTaskTime
{
get;
private set;
}
/// <summary>
/// Gets the TotalTaskTime.
/// </summary>
/// <remarks>
/// This value will get reseted over time, after it reaches the maximum int32 value.
/// </remarks>
public long TotalTasksTime
{
get { return totalTime; }
}
/// <summary>
/// Gets the value that indicated that the thread was created in the ThreadPool.
/// </summary>
public bool IsInitializedInPool
{
get { return isInitializedInPool; }
}
/// <summary>
/// An internal constructor that initializes a smart thread with a parameter
/// indicating that this insance is started in a threadpool.
/// </summary>
/// <param name="isInitializedInPool">Indicates that this instance will live in a threadpool.</param>
internal SmartThread( bool isInitializedInPool )
{
this.isInitializedInPool = isInitializedInPool;
thread = new Thread( new ThreadStart( Process ) );
wait = new ManualResetEvent( isSignalled );
scheduler = new Queue<Task>();
id = thread.ManagedThreadId;
}
/// <summary>
/// Initializes the SmartThread, in the non thread pool scope, therefor
/// this thread will not steal work.
/// </summary>
public SmartThread() : this( false ) { }
/// <summary>
/// The thread Processing loop, that consumes up the queue.
/// </summary>
private void Process()
{
Task localAction = null;
while ( true )
{
//check if our thread is in the signalled state,
//if that's true then reset it and continue work.
if ( isSignalled )
{
isSignalled = wait.Reset();
}
//TODO: use reader writer locks, or consider hardcore low level locks :D
Monitor.Enter( locker );
{
if ( scheduler.Count != 0 )
{
localAction = scheduler.Dequeue();
Monitor.Exit( locker );
}
else
{
//since there is little to do, then just let threads go.
Monitor.Exit( locker );
//lets try to steal some of this work.
bool workStolen = TryStealWork();
//if we stolen something then don't sleap and first check you work queue
//and then try to steal again (in case you haven't noticed this is a very subtle
//form of spin waiting ).
if ( workStolen )
continue;
if ( isPendingJoin )
break;
if ( waitSpinCount++ < waitSpinLmit )
{
//wait and spin, we wake up the thread as this is way more effective when executing
//multiple actions.
isSignalled = wait.WaitOne( waitSpinTime );
}
else
{
waitSpinCount = 0;
isSignalled = wait.WaitOne();
}
}
}
//Process the action outside the lock!
if ( localAction != null )
{
InvokeAction( localAction );
localAction = null;
//update the stats.
//TODO: Make it smarter, and more secure right now there is a very small deadlock chance!!!
SmartThreadPool.Reschedule( isInitializedInPool );
}
}
//end processing.
IsStarted = false;
wait.Close();
}
/// <summary>
/// Invokes the current action.
/// </summary>
/// <param name="localAction">localAction taken from queue.</param>
private void InvokeAction( Task localAction )
{
// start to measure time.
int ticksStart = System.Environment.TickCount;
// do execute the action.
localAction.InternalRun();
localAction.status = TaskStatus.Done;
//we do need to reset the stats so that they will not overflow.
if ( totalTime > int.MaxValue )
{
executionCount = 0;
totalTime = 0;
}
//increment the counter.
executionCount++;
totalTime += System.Environment.TickCount - ticksStart;
AvgTaskTime = totalTime / executionCount;
}
/// <summary>
/// Tries to steal workload from other heavy loaded threads.
/// </summary>
/// <returns>a boolan flag indicating the steal success or failure.</returns>
internal bool TryStealWork()
{
//1. Ask the pool for a thread with the worst stats.
//2. Access it's internal queue by calling count and then doing Dequeue
// Here we eith hold a lock or we dont lock at all and handle all queue empty exception.
SmartThread threadToSteal = SmartThreadPool.GetThreadToSteal( isInitializedInPool );
//This code is needed as ThreadPool might tell us that in some sittuations that we can steal
//work from ourselvs for e.g if other thread will join or we will fork ourselfs and the operations
//is running.
if ( threadToSteal.id != this.id )
{
Task localAction = null;
//lock on the thread we are stealing from.
lock ( threadToSteal.locker )
{
//perform a steal.
if ( threadToSteal.scheduler.Count != 0 )
{
localAction = threadToSteal.scheduler.Dequeue();
}
}
if ( localAction != null )
{
localAction.RunSynchronously();
return true;
}
else
return false;
}
return false;
}
/// <summary>
/// Schedules the current action for execution.
/// </summary>
/// <param name="action">the action to be executed.</param>
public void Execute( Task action )
{
scheduler.Enqueue( action );
isSignalled = wait.Set();
}
/// <summary>
/// Starts the current thread.
/// </summary>
public void Start()
{
if ( isInitializedInPool == false )
SmartThreadPool.InsertToArtificialScheduler( this );
thread.Start();
IsStarted = true;
}
/// <summary>
/// Joins (blocks and releases resources) the current thread.
/// </summary>
public void Join()
{
isPendingJoin = true;
}
/// <summary>
/// Gets the value that indicates if the thread is started.
/// </summary>
public bool IsStarted
{
get;
private set;
}
/// <summary>
/// Passes the execution (gives up the time slice) to another working thread
/// that's located on the same processor.
/// </summary>
/// <returns></returns>
public bool Yield()
{
//for 4.0 we get this for free.
#if NET40
return Thread.Yield();
#else
return SwitchToThread();
#endif
}
// Actually threads should not be comparable :(
// TODO: Create a comparable inner class.
public int CompareTo( SmartThread other )
{
return this.scheduler.Count - other.scheduler.Count;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment