-
-
Save badamczewski/7836efa2f2fe4bd210ba to your computer and use it in GitHub Desktop.
Task Programming model, build over a SmartThreadPool with SmartThreads that incorporate scheduling and work stealing.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#region Licence | |
// Copyright (c) 2012 BAX Services Bartosz Adamczewski | |
// | |
// Permission is hereby granted, free of charge, to any person | |
// obtaining a copy of this software and associated documentation | |
// files (the "Software"), to deal in the Software without | |
// restriction, including without limitation the rights to use, | |
// copy, modify, merge, publish, distribute, sublicense, and/or sell | |
// copies of the Software, and to permit persons to whom the | |
// Software is furnished to do so, subject to the following | |
// conditions: | |
// | |
// The above copyright notice and this permission notice shall be | |
// included in all copies or substantial portions of the Software. | |
// | |
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, | |
// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES | |
// OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND | |
// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT | |
// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, | |
// WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING | |
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR | |
// OTHER DEALINGS IN THE SOFTWARE. | |
#endregion | |
using System; | |
using System.Collections.Generic; | |
using System.Text; | |
using System.Threading; | |
using System.Runtime.InteropServices; | |
using System.Diagnostics; | |
using System.Reflection; | |
namespace SmartTasks | |
{ | |
public delegate void Action(); | |
/// <summary> | |
/// Heap implementation with very small subset of functionality. | |
/// </summary> | |
/// <remarks> | |
/// This heap structure is missing basic functionality as it was designed for | |
/// a very specific purpose and therefore it should not be considered usable | |
/// in any other case then SmartThreadPool. | |
/// </remarks> | |
/// <typeparam name="T">Comparible generic type.</typeparam> | |
public class Heap<T> where T : IComparable<T> | |
{ | |
internal T[] items; | |
private T max; | |
private int size; | |
public bool IsBuilding | |
{ | |
get; | |
private set; | |
} | |
/// <summary> | |
/// Initializes the heap that takes the input array, to construct the heap. | |
/// </summary> | |
/// <param name="array">Generic array.</param> | |
public Heap( T[] array ) | |
{ | |
this.items = array; | |
if ( array.Length != 0 ) | |
max = this.items[ 0 ]; | |
size = array.Length; | |
BuildHeap(); | |
} | |
/// <summary> | |
/// Deletes the minimum element from the heap. | |
/// </summary> | |
/// <returns></returns> | |
public T DeleteMin() | |
{ | |
//get min and ovveride it, with the next array element. | |
T min = items[ 0 ]; | |
items[ 0 ] = items[ --size ]; | |
//build the heap down. | |
BuildDown( 0 ); | |
return min; | |
} | |
/// <summary> | |
/// Reads the minimum element from the heap. | |
/// </summary> | |
/// <returns>the minimum element.</returns> | |
public T ReadMin() | |
{ | |
return items[ 0 ]; | |
} | |
/// <summary> | |
/// Reads the maximum element from the heap. | |
/// </summary> | |
/// <returns>the maximum element.</returns> | |
public T ReadMax() | |
{ | |
return max; | |
} | |
/// <summary> | |
/// Set's a new heap value. | |
/// </summary> | |
/// <param name="newSize">new size.</param> | |
public void SetSize( int newSize ) | |
{ | |
size = newSize; | |
} | |
/// <summary> | |
/// Get's the heap size. | |
/// </summary> | |
/// <returns>size.</returns> | |
public int GetSize() | |
{ | |
return size; | |
} | |
/// <summary> | |
/// Inserts a new item into the heap. | |
/// </summary> | |
/// <param name="item">item to be added.</param> | |
public void Insert( T item ) | |
{ | |
if ( size == items.Length ) | |
Expand(); | |
int k = ++size - 1; | |
items[ k ] = item; | |
if ( max == null ) | |
max = item; | |
BuildUp( k ); | |
} | |
/// <summary> | |
/// Updates the specified value at k-th element. | |
/// </summary> | |
/// <param name="k">the element index to update.</param> | |
/// <param name="item">the specified item value.</param> | |
public void Update( int k, T item ) | |
{ | |
items[ k ] = item; | |
// start as a parent as check for build up is faster. | |
int childOrParrent = ( k - 1 ) / 2; | |
// first check if we should build up or down. | |
if ( CheckAndBuildUp( k, childOrParrent, item ) ) | |
return; | |
childOrParrent = k * 2 + 1; | |
// check the left child. | |
if ( CheckAndBuildDown( k, childOrParrent, item ) ) | |
return; | |
// check the right child. | |
childOrParrent++; | |
if ( CheckAndBuildDown( k, childOrParrent, item ) ) | |
return; | |
} | |
/// <summary> | |
/// Builds the heap. | |
/// </summary> | |
public void BuildHeap() | |
{ | |
IsBuilding = true; | |
for ( int k = size / 2; k >= 0; k-- ) | |
BuildDown( k ); | |
IsBuilding = false; | |
} | |
/// <summary> | |
/// Builds the heap down. | |
/// </summary> | |
/// <param name="k">the k-th element to start from.</param> | |
public void BuildDown( int k ) | |
{ | |
int child = k; | |
for ( ; 2 * child + 1 < size; k = child ) | |
{ | |
// local subtree root | |
T root = items[ k ]; | |
// left child. | |
child = 2 * k + 1; | |
// if left child is bigger then the right then pick the right. | |
if ( child != size - 1 && items[ child ].CompareTo( items[ child + 1 ] ) > 0 ) | |
child++; | |
// now compare with root | |
if ( root.CompareTo( items[ child ] ) > 0 ) | |
{ | |
if ( max.CompareTo( items[ child ] ) < 0 ) | |
max = items[ child ]; | |
// swamp | |
items[ k ] = items[ child ]; | |
items[ child ] = root; | |
} | |
} | |
} | |
/// <summary> | |
/// Checks if k-th element is smaller then it's parent and builds up | |
/// if that yeilds true. | |
/// </summary> | |
/// <param name="k">k-th element.</param> | |
/// <param name="parent">the parent.</param> | |
/// <param name="item">the value of k-th element.</param> | |
/// <returns>boolean value indicating that the heap was build up.</returns> | |
private bool CheckAndBuildUp( int k, int parent, T item ) | |
{ | |
if ( parent > 0 && item.CompareTo( items[ parent ] ) < 0 ) | |
{ | |
items[ k ] = items[ parent ]; | |
items[ parent ] = item; | |
BuildUp( parent ); | |
return true; | |
} | |
return false; | |
} | |
/// <summary> | |
/// Checks if k-th element is bigger then it's parent and builds down | |
/// if that yeilds true. | |
/// </summary> | |
/// <param name="k">k-th element.</param> | |
/// <param name="parent">the parent.</param> | |
/// <param name="item">the value of k-th element.</param> | |
/// <returns>boolean value indicating that the heap was build down.</returns> | |
private bool CheckAndBuildDown( int k, int child, T item ) | |
{ | |
if ( child != items.Length && item.CompareTo( items[ child ] ) > 0 ) | |
{ | |
// swamp | |
items[ k ] = items[ child ]; | |
items[ child ] = item; | |
BuildDown( child ); | |
return true; | |
} | |
return false; | |
} | |
/// <summary> | |
/// Builds the heap up. | |
/// </summary> | |
/// <param name="k">the k-th element to start from.</param> | |
private void BuildUp( int k ) | |
{ | |
int parent = k; | |
for ( ; k > 1; k = parent ) | |
{ | |
// local subtree root | |
T root = items[ k ]; | |
parent = ( k - 1 ) / 2; | |
// check my parent, if im smaller then him then swamp | |
if ( root.CompareTo( items[ parent ] ) > 0 ) | |
{ | |
if ( max.CompareTo( items[ parent ] ) <= 0 ) | |
max = items[ parent ]; | |
items[ k ] = items[ parent ]; | |
items[ parent ] = root; | |
} | |
} | |
} | |
/// <summary> | |
/// Expands the heap array. | |
/// </summary> | |
private void Expand() | |
{ | |
T[] oldItems = items; | |
int len = items.Length; | |
if ( len == 0 ) | |
len = 1; | |
else | |
len *= 2; | |
items = new T[ len ]; | |
if ( oldItems.Length != 0 ) | |
Array.Copy( oldItems, items, size ); | |
} | |
} | |
/// <summary> | |
/// Thread pool that incorprates fair thread scheduling as well as work stealing. | |
/// </summary> | |
public static class SmartThreadPool | |
{ | |
/// <summary> | |
/// Gets/Sets the threadpool debug mode. | |
/// </summary> | |
/// <remarks> | |
/// When enabled upon scheduling a new work item, the thread pool | |
/// will log certain actions. | |
/// </remarks> | |
public static bool DebugModeOn | |
{ | |
get; | |
set; | |
} | |
/// <summary> | |
/// Best case count thread count (best case means the lowest count). | |
/// </summary> | |
private static int bestThCnt; | |
/// <summary> | |
/// Locker object for any threads that want to put stuff in. | |
/// </summary> | |
private static readonly object locker = new object(); | |
/// <summary> | |
/// Locker for artificial scheduler. | |
/// </summary> | |
private static readonly object artificialLocker = new object(); | |
/// <summary> | |
/// Indicates that the artificial lock was taken by a thread | |
/// </summary> | |
private static bool artificialLockerTaken; | |
/// <summary> | |
/// Heap that represents a priority queue of worker threads. | |
/// </summary> | |
private static readonly Heap<SmartThread> threadScheduler = null; | |
/// <summary> | |
/// Heap that represents a priority queue of worker threads that werent created in the pool. | |
/// </summary> | |
private static readonly Heap<SmartThread> artificialThreadScheduler = null; | |
/// <summary> | |
/// A default static constructor that initializes the pool threads. | |
/// </summary> | |
static SmartThreadPool() | |
{ | |
//create an empty heap. | |
artificialThreadScheduler = new Heap<SmartThread>( new SmartThread[] { } ); | |
//the idea here is {core_count} * 2 the rest should be spawned as fibers. | |
//(fiber fuctionality comming soon :D, keep your hopes up kids!) | |
SmartThread[] threads = new SmartThread[ Environment.ProcessorCount * 2 ]; | |
for ( int i = 0; i < threads.Length; i++ ) | |
threads[ i ] = new SmartThread( true ); | |
threadScheduler = new Heap<SmartThread>( threads ); | |
} | |
/// <summary> | |
/// Queues a new Action on the thread pool. | |
/// </summary> | |
/// <param name="action">Action delegate.</param> | |
public static void QueueWorkItem( Task action ) | |
{ | |
SmartThread lowestWorkloadThread = null; | |
lock ( locker ) | |
{ | |
/* | |
* Accesing the root and it's direct children gives us better performance then just | |
* accesing the root. This is due that we don't lock the items on heap operations | |
* so while we reorganize the queue we might end up in a worse thread, but statistically | |
* when this happens better load statistics are in roots children. | |
*/ | |
lowestWorkloadThread = threadScheduler.items[ bestThCnt++ % 3 ]; | |
} | |
//If a thread is not started then do Start it. | |
if ( lowestWorkloadThread.IsStarted == false ) | |
{ | |
lowestWorkloadThread.Start(); | |
} | |
//schedule a task. | |
lowestWorkloadThread.Execute( action ); | |
// for debug only. | |
if ( DebugModeOn == true ) | |
{ | |
foreach ( SmartThread thread in threadScheduler.items ) | |
{ | |
Console.Write( string.Format( "t{0:000} a:{1:000} c:{2:000}; ", | |
thread.ThreadId, thread.AvgTaskTime, thread.scheduler.Count ) ); | |
} | |
Console.WriteLine(); | |
} | |
} | |
/// <summary> | |
/// Tries to steal work from the most loaded thread in the pool. | |
/// </summary> | |
/// <returns>SmartThread</returns> | |
/// <param name="threadPoolThread">boolean value idnicating that we want threads that | |
/// were created in a threadpool.</param> | |
/// <returns>SmartThread that has the most work to do.</returns> | |
internal static SmartThread GetThreadToSteal( bool threadPoolThread ) | |
{ | |
//get the element that has most of the load. | |
if ( threadPoolThread ) | |
return threadScheduler.ReadMax(); | |
return artificialThreadScheduler.ReadMax(); | |
} | |
/// <summary> | |
/// Atempts to rebuild the pririty queue, to contain correct information | |
/// about priorities. | |
/// </summary> | |
/// <param name="threadPoolThread">boolean value idnicating that we want threads that | |
/// were created in a threadpool.</param> | |
internal static void Reschedule( bool threadPoolThread ) | |
{ | |
if ( threadPoolThread ) | |
{ | |
//we don't need to lock this section as generally we are ok | |
//with the race as it will not cause any errors but putting a simple | |
//flag arround it should be enough to prevent most races. | |
if ( threadScheduler.IsBuilding == false ) | |
threadScheduler.BuildHeap(); | |
return; | |
} | |
//if the lock was not taken then we proceed to lock it | |
//we use this as we don't want to block threads that build heap | |
//just the threads that insert and remove values from the heap. | |
if ( artificialLockerTaken == false ) | |
artificialLockerTaken = Monitor.TryEnter( artificialLocker ); | |
else | |
return; | |
if ( artificialThreadScheduler.IsBuilding == false ) | |
artificialThreadScheduler.BuildHeap(); | |
//we taken the lock so we need to exit. | |
if ( artificialLockerTaken == true ) | |
Monitor.Exit( artificialLocker ); | |
} | |
/// <summary> | |
/// Inserts a thread that not orginated in a thread pool to the artificial scheduler, | |
/// in order to enable fair work scheduling and work stealing. | |
/// </summary> | |
/// <param name="thread">SmartThread.</param> | |
internal static void InsertToArtificialScheduler( SmartThread thread ) | |
{ | |
lock ( artificialLocker ) | |
{ | |
artificialThreadScheduler.Insert( thread ); | |
} | |
} | |
/// <summary> | |
/// Removes the given thread that not orginated in a thread pool from the artificial scheduler. | |
/// </summary> | |
/// <param name="thread">SmartThread</param> | |
internal static void RemoveFromArtificialScheduler( SmartThread thread ) | |
{ | |
lock ( artificialLocker ) | |
{ | |
int cnt = 0; | |
int size = artificialThreadScheduler.GetSize(); | |
SmartThread[] artificialThreadItems = new SmartThread[ --size ]; | |
//the linear search in this context should not be to big of a problem, | |
//as if we join then this means that probably our queue is empty so we should | |
//be somwhere in the top of the tree. | |
foreach ( SmartThread artificialThread in artificialThreadScheduler.items ) | |
{ | |
if ( artificialThread.ThreadId != thread.ThreadId ) | |
{ | |
artificialThreadItems[ cnt ] = artificialThread; | |
cnt++; | |
} | |
} | |
artificialThreadScheduler.SetSize( size ); | |
artificialThreadScheduler.items = artificialThreadItems; | |
} | |
} | |
} | |
public enum TaskStatus | |
{ | |
Error, | |
Running, | |
NotStarted, | |
Done | |
} | |
[global::System.Serializable] | |
public class TaskException : Exception | |
{ | |
// | |
// For guidelines regarding the creation of new exception types, see | |
// http://msdn.microsoft.com/library/default.asp?url=/library/en-us/cpgenref/html/cpconerrorraisinghandlingguidelines.asp | |
// and | |
// http://msdn.microsoft.com/library/default.asp?url=/library/en-us/dncscol/html/csharp07192001.asp | |
// | |
public TaskException() { } | |
public TaskException( string message ) : base( message ) { } | |
public TaskException( string message, Exception inner ) : base( message, inner ) { } | |
protected TaskException( | |
System.Runtime.Serialization.SerializationInfo info, | |
System.Runtime.Serialization.StreamingContext context ) : base( info, context ) { } | |
} | |
/// <summary> | |
/// Represents a Task. | |
/// </summary> | |
public class Task | |
{ | |
private ManualResetEvent wait = new ManualResetEvent( false ); | |
private Action action; | |
internal TaskStatus status; | |
private TaskException ex; | |
/// <summary> | |
/// Initializes a task by passign the Action to be executed by it. | |
/// </summary> | |
/// <param name="action">action to be executed.</param> | |
public Task( Action action ) | |
{ | |
this.action = action; | |
status = TaskStatus.NotStarted; | |
} | |
/// <summary> | |
/// Blocks the current thread until the tasks work is done. | |
/// </summary> | |
public void Wait() | |
{ | |
wait.WaitOne(); | |
if ( status == TaskStatus.Error ) | |
throw ex; | |
} | |
/// <summary> | |
/// Runs the task synchronously. | |
/// </summary> | |
public void RunSynchronously() | |
{ | |
RunSynchronously( new Task[] { } ); | |
} | |
internal TaskStatus InternalRun() | |
{ | |
try | |
{ | |
status = TaskStatus.Running; | |
action(); | |
wait.Set(); | |
status = TaskStatus.Done; | |
} | |
catch ( Exception ex ) | |
{ | |
//we need to capture the exception from other threads as we don't want to crash the | |
//pools threads. | |
status = TaskStatus.Error; | |
this.ex = new TaskException( "InternalRun", ex ); | |
} | |
return status; | |
} | |
/// <summary> | |
/// Runs the task asynchronously. | |
/// </summary> | |
public void RunAsynchronously() | |
{ | |
RunAsynchronously( new Task[] { } ); | |
} | |
/// <summary> | |
/// Runs the task synchronously. | |
/// </summary> | |
/// <param name="tasksToWaitFor">the list of tasks to wait for, before running.</param> | |
public void RunSynchronously( params Task[] tasksToWaitFor ) | |
{ | |
WaitForCompletion( tasksToWaitFor ); | |
status = TaskStatus.Running; | |
if ( InternalRun() == TaskStatus.Error ) | |
throw ex; | |
} | |
/// <summary> | |
/// Runs the task asynchronously. | |
/// </summary> | |
/// <param name="tasksToWaitFor">the list of tasks to wait for, before running.</param> | |
public void RunAsynchronously( params Task[] tasksToWaitFor ) | |
{ | |
WaitForCompletion( tasksToWaitFor ); | |
status = TaskStatus.Running; | |
SmartThreadPool.QueueWorkItem( this ); | |
} | |
/// <summary> | |
/// Creates a task. | |
/// </summary> | |
/// <param name="action">The action to be eecuted by a task.</param> | |
/// <returns>Task.</returns> | |
public static Task Create( Action action ) | |
{ | |
return new Task( action ); | |
} | |
private void WaitForCompletion( Task[] tasks ) | |
{ | |
foreach ( Task task in tasks ) | |
task.Wait(); | |
} | |
} | |
/// <summary> | |
/// A smart thread that incorporates work stealing tehniques. | |
/// </summary> | |
public class SmartThread : IComparable<SmartThread> | |
{ | |
/// <summary> | |
/// Native API import function, that is used to Yeild a thread on | |
/// the same processor. | |
/// </summary> | |
/// <returns>the value indicating that the operation was successful.</returns> | |
[DllImport( "kernel32.dll" )] | |
static extern bool SwitchToThread(); | |
private const int waitSpinLmit = 5; | |
private const int waitSpinTime = 30; | |
private readonly Thread thread; | |
internal readonly Queue<Task> scheduler; | |
private readonly ManualResetEvent wait; | |
internal readonly object locker = new object(); | |
private bool isSignalled; | |
private bool isPendingJoin; | |
private int id; | |
private int executionCount; | |
private long totalTime; | |
private bool isInitializedInPool; | |
private int waitSpinCount = 0; | |
/// <summary> | |
/// Gets the ManagedThread Id. | |
/// </summary> | |
public int ThreadId | |
{ | |
get { return id; } | |
} | |
/// <summary> | |
/// Gets the Average task running time. | |
/// </summary> | |
public decimal AvgTaskTime | |
{ | |
get; | |
private set; | |
} | |
/// <summary> | |
/// Gets the TotalTaskTime. | |
/// </summary> | |
/// <remarks> | |
/// This value will get reseted over time, after it reaches the maximum int32 value. | |
/// </remarks> | |
public long TotalTasksTime | |
{ | |
get { return totalTime; } | |
} | |
/// <summary> | |
/// Gets the value that indicated that the thread was created in the ThreadPool. | |
/// </summary> | |
public bool IsInitializedInPool | |
{ | |
get { return isInitializedInPool; } | |
} | |
/// <summary> | |
/// An internal constructor that initializes a smart thread with a parameter | |
/// indicating that this insance is started in a threadpool. | |
/// </summary> | |
/// <param name="isInitializedInPool">Indicates that this instance will live in a threadpool.</param> | |
internal SmartThread( bool isInitializedInPool ) | |
{ | |
this.isInitializedInPool = isInitializedInPool; | |
thread = new Thread( new ThreadStart( Process ) ); | |
wait = new ManualResetEvent( isSignalled ); | |
scheduler = new Queue<Task>(); | |
id = thread.ManagedThreadId; | |
} | |
/// <summary> | |
/// Initializes the SmartThread, in the non thread pool scope, therefor | |
/// this thread will not steal work. | |
/// </summary> | |
public SmartThread() : this( false ) { } | |
/// <summary> | |
/// The thread Processing loop, that consumes up the queue. | |
/// </summary> | |
private void Process() | |
{ | |
Task localAction = null; | |
while ( true ) | |
{ | |
//check if our thread is in the signalled state, | |
//if that's true then reset it and continue work. | |
if ( isSignalled ) | |
{ | |
isSignalled = wait.Reset(); | |
} | |
//TODO: use reader writer locks, or consider hardcore low level locks :D | |
Monitor.Enter( locker ); | |
{ | |
if ( scheduler.Count != 0 ) | |
{ | |
localAction = scheduler.Dequeue(); | |
Monitor.Exit( locker ); | |
} | |
else | |
{ | |
//since there is little to do, then just let threads go. | |
Monitor.Exit( locker ); | |
//lets try to steal some of this work. | |
bool workStolen = TryStealWork(); | |
//if we stolen something then don't sleap and first check you work queue | |
//and then try to steal again (in case you haven't noticed this is a very subtle | |
//form of spin waiting ). | |
if ( workStolen ) | |
continue; | |
if ( isPendingJoin ) | |
break; | |
if ( waitSpinCount++ < waitSpinLmit ) | |
{ | |
//wait and spin, we wake up the thread as this is way more effective when executing | |
//multiple actions. | |
isSignalled = wait.WaitOne( waitSpinTime ); | |
} | |
else | |
{ | |
waitSpinCount = 0; | |
isSignalled = wait.WaitOne(); | |
} | |
} | |
} | |
//Process the action outside the lock! | |
if ( localAction != null ) | |
{ | |
InvokeAction( localAction ); | |
localAction = null; | |
//update the stats. | |
//TODO: Make it smarter, and more secure right now there is a very small deadlock chance!!! | |
SmartThreadPool.Reschedule( isInitializedInPool ); | |
} | |
} | |
//end processing. | |
IsStarted = false; | |
wait.Close(); | |
} | |
/// <summary> | |
/// Invokes the current action. | |
/// </summary> | |
/// <param name="localAction">localAction taken from queue.</param> | |
private void InvokeAction( Task localAction ) | |
{ | |
// start to measure time. | |
int ticksStart = System.Environment.TickCount; | |
// do execute the action. | |
localAction.InternalRun(); | |
localAction.status = TaskStatus.Done; | |
//we do need to reset the stats so that they will not overflow. | |
if ( totalTime > int.MaxValue ) | |
{ | |
executionCount = 0; | |
totalTime = 0; | |
} | |
//increment the counter. | |
executionCount++; | |
totalTime += System.Environment.TickCount - ticksStart; | |
AvgTaskTime = totalTime / executionCount; | |
} | |
/// <summary> | |
/// Tries to steal workload from other heavy loaded threads. | |
/// </summary> | |
/// <returns>a boolan flag indicating the steal success or failure.</returns> | |
internal bool TryStealWork() | |
{ | |
//1. Ask the pool for a thread with the worst stats. | |
//2. Access it's internal queue by calling count and then doing Dequeue | |
// Here we eith hold a lock or we dont lock at all and handle all queue empty exception. | |
SmartThread threadToSteal = SmartThreadPool.GetThreadToSteal( isInitializedInPool ); | |
//This code is needed as ThreadPool might tell us that in some sittuations that we can steal | |
//work from ourselvs for e.g if other thread will join or we will fork ourselfs and the operations | |
//is running. | |
if ( threadToSteal.id != this.id ) | |
{ | |
Task localAction = null; | |
//lock on the thread we are stealing from. | |
lock ( threadToSteal.locker ) | |
{ | |
//perform a steal. | |
if ( threadToSteal.scheduler.Count != 0 ) | |
{ | |
localAction = threadToSteal.scheduler.Dequeue(); | |
} | |
} | |
if ( localAction != null ) | |
{ | |
localAction.RunSynchronously(); | |
return true; | |
} | |
else | |
return false; | |
} | |
return false; | |
} | |
/// <summary> | |
/// Schedules the current action for execution. | |
/// </summary> | |
/// <param name="action">the action to be executed.</param> | |
public void Execute( Task action ) | |
{ | |
scheduler.Enqueue( action ); | |
isSignalled = wait.Set(); | |
} | |
/// <summary> | |
/// Starts the current thread. | |
/// </summary> | |
public void Start() | |
{ | |
if ( isInitializedInPool == false ) | |
SmartThreadPool.InsertToArtificialScheduler( this ); | |
thread.Start(); | |
IsStarted = true; | |
} | |
/// <summary> | |
/// Joins (blocks and releases resources) the current thread. | |
/// </summary> | |
public void Join() | |
{ | |
isPendingJoin = true; | |
} | |
/// <summary> | |
/// Gets the value that indicates if the thread is started. | |
/// </summary> | |
public bool IsStarted | |
{ | |
get; | |
private set; | |
} | |
/// <summary> | |
/// Passes the execution (gives up the time slice) to another working thread | |
/// that's located on the same processor. | |
/// </summary> | |
/// <returns></returns> | |
public bool Yield() | |
{ | |
//for 4.0 we get this for free. | |
#if NET40 | |
return Thread.Yield(); | |
#else | |
return SwitchToThread(); | |
#endif | |
} | |
// Actually threads should not be comparable :( | |
// TODO: Create a comparable inner class. | |
public int CompareTo( SmartThread other ) | |
{ | |
return this.scheduler.Count - other.scheduler.Count; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment