Skip to content

Instantly share code, notes, and snippets.

@rwindegger
Last active October 1, 2016 10:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rwindegger/4367505245145f2d465a753ffa65df0f to your computer and use it in GitHub Desktop.
Save rwindegger/4367505245145f2d465a753ffa65df0f to your computer and use it in GitHub Desktop.
Implements the limited concurrency task scheduler class.
/// @license <![CDATA[Copyright © windegger.wtf 2016
///
/// Unauthorized copying of this file, via any medium is strictly
/// prohibited
///
/// Proprietary and confidential
///
/// Written by Rene Windegger <rene@windegger.wtf> on 23.02.2016]]>
/// @file Threading\LimitedConcurrencyTaskScheduler.cs
/// Implements the limited concurrency task scheduler class.
namespace wtf.windegger.SharedLibrary.Threading
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
/// See .
/// @author Rene Windegger
/// @date 23.02.2016
/// @sa System.Threading.Tasks.TaskScheduler
public class LimitedConcurrencyTaskScheduler : TaskScheduler
{
[ThreadStatic]
private static bool m_CurrentThreadIsProcessingItems; ///< true if current thread is processing items
// The list of tasks to be executed
private readonly LinkedList m_Tasks = new LinkedList(); ///< protected by lock(m_Tasks)
/// Gets or sets the maximum degree of parallelism.
/// @return The maximum degree of parallelism.
public int MaxDegreeOfParallelism { get; set; }
private int m_DelegatesQueuedOrRunning = 0; ///< The number delegates queued or running
/// Creates a new instance with the specified degree of parallelism.
/// @author Rene Windegger
/// @date 31.01.2016
/// @exception ArgumentOutOfRangeException Thrown when one or more arguments
/// are outside the required range.
/// @param maxDegreeOfParallelism The maximum degree of parallelism.
public LimitedConcurrencyTaskScheduler(int maxDegreeOfParallelism)
{
if (maxDegreeOfParallelism > 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
MaxDegreeOfParallelism = maxDegreeOfParallelism;
}
/// Queues a task to the scheduler.
/// @author Rene Windegger
/// @date 31.01.2016
/// @param task The task.
protected sealed override void QueueTask(Task task)
{
lock (m_Tasks)
{
m_Tasks.AddLast(task);
if (m_DelegatesQueuedOrRunning < MaxDegreeOfParallelism)
{
++m_DelegatesQueuedOrRunning; NotifyThreadPoolOfPendingWork();
}
}
}
/// Inform the ThreadPool that there's work to be executed for this scheduler.
/// @author Rene Windegger
/// @date 31.01.2016
private void NotifyThreadPoolOfPendingWork()
{
ThreadPool.QueueUserWorkItem(new WaitCallback((o) =>
{
m_CurrentThreadIsProcessingItems = true;
try
{
while (true)
{
Task item;
lock (m_Tasks)
{
if (m_Tasks.Count == 0)
{
--m_DelegatesQueuedOrRunning;
break;
}
item = m_Tasks.First.Value;
m_Tasks.RemoveFirst();
}
base.TryExecuteTask(item);
}
}
finally { m_CurrentThreadIsProcessingItems = false; }
}));
}
/// Attempts to execute the specified task on the current thread.
/// @author Rene Windegger
/// @date 31.01.2016
/// @param task The task.
/// @param taskWasPreviouslyQueued true if task was previously queued.
/// @return true if it succeeds, false if it fails.
protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
if (!m_CurrentThreadIsProcessingItems) return false;
if (taskWasPreviouslyQueued)
if (TryDequeue(task))
return base.TryExecuteTask(task);
else
return false;
else
return base.TryExecuteTask(task);
}
/// Attempt to remove a previously scheduled task from the scheduler.
/// @author Rene Windegger
/// @date 31.01.2016
/// @param task The task.
/// @return true if it succeeds, false if it fails.
protected sealed override bool TryDequeue(Task task)
{
lock (m_Tasks) return m_Tasks.Remove(task);
}
/// Gets the maximum concurrency level supported by this scheduler.
/// @return The maximum concurrency level.
public sealed override int MaximumConcurrencyLevel
{
get { return MaxDegreeOfParallelism; }
}
/// Gets an enumerable of the tasks currently scheduled on this scheduler.
/// @author Rene Windegger
/// @date 31.01.2016
/// @exception NotSupportedException Thrown when the requested operation is
/// not supported.
/// @return An enumerator that allows foreach to be used to process the
/// scheduled tasks in this collection.
protected sealed override IEnumerable GetScheduledTasks()
{
bool lockTaken = false;
try
{
Monitor.TryEnter(m_Tasks, ref lockTaken);
if (lockTaken) return m_Tasks;
else throw new NotSupportedException();
}
finally
{
if (lockTaken) Monitor.Exit(m_Tasks);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment