Skip to content

Instantly share code, notes, and snippets.

@andreasohlund
Created September 18, 2015 06:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save andreasohlund/020eb2a81249689174d5 to your computer and use it in GitHub Desktop.
Save andreasohlund/020eb2a81249689174d5 to your computer and use it in GitHub Desktop.
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Transactions;
using MassTransit.Util;
namespace MsmqAsyncSpike
{
using System;
using System.Messaging;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static int numMessages = 1000;
private static int bodySizeBytes = 1024 * 1000
;
static void Main(string[] args)
{
var queue = new MessageQueue(@".\private$\asyncspike");
queue.Purge();
FillQueue(queue,bodySizeBytes);
Console.Out.WriteLine("queue filled");
var cancellationToken = new CancellationTokenSource();
var task = Task.Factory.StartNew(() => new Program().StartMessagPump(queue, cancellationToken.Token), cancellationToken.Token)
.Unwrap()
.ContinueWith(t =>
{
Console.Out.WriteLine("Stopped " + t.Status);
});
Console.ReadKey();
cancellationToken.Cancel();
task.Wait();
queue.Dispose();
Console.Out.WriteLine("stopped");
Console.ReadKey();
}
static void FillQueue(MessageQueue queue, int numBytes)
{
Parallel.For(0, numMessages, (i, s) =>
{
//queue.Send(new Message(), MessageQueueTransactionType.Single);
using (var memStream = new MemoryStream(new byte[numBytes]))
{
var m = new Message { BodyStream = memStream };
queue.Send(m, MessageQueueTransactionType.Single);
}
});
}
async Task StartMessagPump(MessageQueue queue, CancellationToken token)
{
sw.Start();
//var enumerator = ;
using (var enumerator = queue.GetMessageEnumerator2())
{
var maxConcurrency = 1000;
var semaphore = new SemaphoreSlim(maxConcurrency);
var tasks = new ConcurrentDictionary<Task, Task>();
while (!token.IsCancellationRequested)
{
if (!enumerator.MoveNext(TimeSpan.FromMilliseconds(10)))
{
//Console.Out.WriteLine("Empty");
continue;
}
await semaphore.WaitAsync(token);
var task = Task.Run(async () =>
{
//Console.Out.WriteLine("Started");
await ProcessMessage(queue).ConfigureAwait(false);
//Console.Out.WriteLine("Processed");
}, token);
task.ContinueWith(t =>
{
semaphore.Release();
Task whoCares;
tasks.TryRemove(t, out whoCares);
}, TaskContinuationOptions.ExecuteSynchronously);
tasks.AddOrUpdate(task, task, (k, v) => task);
//Console.Out.WriteLine("Requested");
}
Console.Out.WriteLine("Messages: " + numProcessed);
Console.Out.WriteLine("Tasks: " + tasks.Count);
await Task.WhenAll(tasks.Values);
}
}
async Task ProcessMessage(MessageQueue queue)
{
Func<Message, Task> pipe = m =>
{
if (Interlocked.Increment(ref numProcessed) == numMessages)
{
var msgPerSec = numMessages / sw.Elapsed.TotalSeconds;
Console.Out.WriteLine("Done: {0:0.000}", msgPerSec);
}
//return Task.Delay(TimeSpan.FromSeconds(1));
return Task.FromResult(0);
};
//// ------------- Native ---------------
using (var tx = new MessageQueueTransaction())
{
tx.Begin();
var message = queue.Receive(TimeSpan.FromMilliseconds(10), tx);
using (var sr = new StreamReader(message.BodyStream))
{
var buffer = sr.ReadToEnd();
await pipe(message).ConfigureAwait(false);
}
tx.Commit();
}
// ------------- TX Scope ---------------
//using (var tx = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
//{
// var message = queue.Receive(TimeSpan.FromMilliseconds(10), MessageQueueTransactionType.Automatic);
// await pipe(message);
// tx.Complete();
//}
// -------------No tx at all--------------
//var message = queue.Receive(TimeSpan.FromSeconds(5));
//await pipe(message);
}
static int numProcessed;
static Stopwatch sw = new Stopwatch();
}
class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
{
// Indicates whether the current thread is processing work items.
[ThreadStatic]
private static bool _currentThreadIsProcessingItems;
// The list of tasks to be executed
private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks)
// The maximum concurrency level allowed by this scheduler.
private readonly int _maxDegreeOfParallelism;
// Indicates whether the scheduler is currently processing work items.
private int _delegatesQueuedOrRunning = 0;
// Creates a new instance with the specified degree of parallelism.
public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
{
if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
_maxDegreeOfParallelism = maxDegreeOfParallelism;
}
// Queues a task to the scheduler.
protected sealed override void QueueTask(Task task)
{
// Add the task to the list of tasks to be processed. If there aren't enough
// delegates currently queued or running to process tasks, schedule another.
lock (_tasks)
{
_tasks.AddLast(task);
if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
{
++_delegatesQueuedOrRunning;
NotifyThreadPoolOfPendingWork();
}
}
}
// Inform the ThreadPool that there's work to be executed for this scheduler.
private void NotifyThreadPoolOfPendingWork()
{
ThreadPool.UnsafeQueueUserWorkItem(_ =>
{
// Note that the current thread is now processing work items.
// This is necessary to enable inlining of tasks into this thread.
_currentThreadIsProcessingItems = true;
try
{
// Process all available items in the queue.
while (true)
{
Task item;
lock (_tasks)
{
// When there are no more items to be processed,
// note that we're done processing, and get out.
if (_tasks.Count == 0)
{
--_delegatesQueuedOrRunning;
break;
}
// Get the next item from the queue
item = _tasks.First.Value;
_tasks.RemoveFirst();
}
// Execute the task we pulled out of the queue
TryExecuteTask(item);
}
}
// We're done processing items on the current thread
finally { _currentThreadIsProcessingItems = false; }
}, null);
}
// Attempts to execute the specified task on the current thread.
protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
// If this thread isn't already processing a task, we don't support inlining
if (!_currentThreadIsProcessingItems) return false;
// If the task was previously queued, remove it from the queue
if (taskWasPreviouslyQueued)
// Try to run the task.
if (TryDequeue(task))
return TryExecuteTask(task);
else
return false;
else
return TryExecuteTask(task);
}
// Attempt to remove a previously scheduled task from the scheduler.
protected sealed override bool TryDequeue(Task task)
{
lock (_tasks) return _tasks.Remove(task);
}
// Gets the maximum concurrency level supported by this scheduler.
public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } }
// Gets an enumerable of the tasks currently scheduled on this scheduler.
protected sealed override IEnumerable<Task> GetScheduledTasks()
{
var lockTaken = false;
try
{
Monitor.TryEnter(_tasks, ref lockTaken);
if (lockTaken) return _tasks;
else throw new NotSupportedException();
}
finally
{
if (lockTaken) Monitor.Exit(_tasks);
}
}
}
}
// Copyright 2007-2015 Chris Patterson, Dru Sellers, Travis Smith, et. al.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
namespace MassTransit.Util
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
/// <summary>
/// Provides a TaskScheduler that provides control over priorities, fairness, and the underlying threads utilized.
/// </summary>
[DebuggerTypeProxy(typeof(QueuedTaskSchedulerDebugView)), DebuggerDisplay("Id={Id}, Queues={DebugQueueCount}, ScheduledTasks = {DebugTaskCount}")]
public sealed class QueuedTaskScheduler : TaskScheduler,
IDisposable
{
/// <summary>Whether we're processing tasks on the current thread.</summary>
static readonly ThreadLocal<bool> _taskProcessingThread = new ThreadLocal<bool>();
/// <summary>The collection of tasks to be executed on our custom threads.</summary>
readonly BlockingCollection<Task> _blockingTaskQueue;
/// <summary>
/// The maximum allowed concurrency level of this scheduler. If custom threads are
/// used, this represents the number of created threads.
/// </summary>
readonly int _concurrencyLevel;
/// <summary>Cancellation token used for disposal.</summary>
readonly CancellationTokenSource _disposeCancellation = new CancellationTokenSource();
// ***
// *** For when using a target scheduler
// ***
/// <summary>The queue of tasks to process when using an underlying target scheduler.</summary>
readonly Queue<Task> _nonthreadsafeTaskQueue;
/// <summary>
/// A sorted list of round-robin queue lists. Tasks with the smallest priority value
/// are preferred. Priority groups are round-robin'd through in order of priority.
/// </summary>
readonly SortedList<int, QueueGroup> _queueGroups = new SortedList<int, QueueGroup>();
/// <summary>The scheduler onto which actual work is scheduled.</summary>
readonly TaskScheduler _targetScheduler;
// ***
// *** For when using our own threads
// ***
/// <summary>The threads used by the scheduler to process work.</summary>
readonly Thread[] _threads;
/// <summary>The number of Tasks that have been queued or that are running whiel using an underlying scheduler.</summary>
int _delegatesQueuedOrRunning;
// ***
/// <summary>Initializes the scheduler.</summary>
public QueuedTaskScheduler()
: this(Default, 0)
{
}
/// <summary>Initializes the scheduler.</summary>
/// <param name="targetScheduler">The target underlying scheduler onto which this sceduler's work is queued.</param>
public QueuedTaskScheduler(TaskScheduler targetScheduler)
: this(targetScheduler, 0)
{
}
/// <summary>Initializes the scheduler.</summary>
/// <param name="targetScheduler">The target underlying scheduler onto which this sceduler's work is queued.</param>
/// <param name="maxConcurrencyLevel">The maximum degree of concurrency allowed for this scheduler's work.</param>
public QueuedTaskScheduler(TaskScheduler targetScheduler, int maxConcurrencyLevel)
{
if (targetScheduler == null)
throw new ArgumentNullException("targetScheduler");
if (maxConcurrencyLevel < 0)
throw new ArgumentOutOfRangeException("maxConcurrencyLevel");
// Initialize only those fields relevant to use an underlying scheduler. We don't
// initialize the fields relevant to using our own custom threads.
_targetScheduler = targetScheduler;
_nonthreadsafeTaskQueue = new Queue<Task>();
// If 0, use the number of logical processors. But make sure whatever value we pick
// is not greater than the degree of parallelism allowed by the underlying scheduler.
_concurrencyLevel = maxConcurrencyLevel != 0 ? maxConcurrencyLevel : Environment.ProcessorCount;
if (targetScheduler.MaximumConcurrencyLevel > 0 && targetScheduler.MaximumConcurrencyLevel < _concurrencyLevel)
_concurrencyLevel = targetScheduler.MaximumConcurrencyLevel;
}
/// <summary>Initializes the scheduler.</summary>
/// <param name="threadCount">The number of threads to create and use for processing work items.</param>
public QueuedTaskScheduler(int threadCount)
: this(threadCount, string.Empty, false, ThreadPriority.Normal, ApartmentState.MTA, 0, null, null)
{
}
/// <summary>Initializes the scheduler.</summary>
/// <param name="threadCount">The number of threads to create and use for processing work items.</param>
/// <param name="threadName">The name to use for each of the created threads.</param>
/// <param name="useForegroundThreads">A Boolean value that indicates whether to use foreground threads instead of background.</param>
/// <param name="threadPriority">The priority to assign to each thread.</param>
/// <param name="threadApartmentState">The apartment state to use for each thread.</param>
/// <param name="threadMaxStackSize">The stack size to use for each thread.</param>
/// <param name="threadInit">An initialization routine to run on each thread.</param>
/// <param name="threadFinally">A finalization routine to run on each thread.</param>
public QueuedTaskScheduler(
int threadCount,
string threadName = "",
bool useForegroundThreads = false,
ThreadPriority threadPriority = ThreadPriority.Normal,
ApartmentState threadApartmentState = ApartmentState.MTA,
int threadMaxStackSize = 0,
Action threadInit = null,
Action threadFinally = null)
{
// Validates arguments (some validation is left up to the Thread type itself).
// If the thread count is 0, default to the number of logical processors.
if (threadCount < 0)
throw new ArgumentOutOfRangeException("threadCount");
_concurrencyLevel = threadCount == 0 ? Environment.ProcessorCount : threadCount;
// Initialize the queue used for storing tasks
_blockingTaskQueue = new BlockingCollection<Task>();
// Create all of the threads
_threads = new Thread[threadCount];
for (int i = 0; i < threadCount; i++)
{
_threads[i] = new Thread(() => ThreadBasedDispatchLoop(threadInit, threadFinally), threadMaxStackSize)
{
Priority = threadPriority,
IsBackground = !useForegroundThreads,
};
if (threadName != null)
_threads[i].Name = threadName + " (" + i + ")";
_threads[i].SetApartmentState(threadApartmentState);
}
// Start all of the threads
foreach (Thread thread in _threads)
thread.Start();
}
/// <summary>Gets the number of queues currently activated.</summary>
int DebugQueueCount
{
get
{
int count = 0;
foreach (var group in _queueGroups)
count += group.Value.Count;
return count;
}
}
/// <summary>Gets the number of tasks currently scheduled.</summary>
int DebugTaskCount
{
get
{
return (_targetScheduler != null
? _nonthreadsafeTaskQueue
: (IEnumerable<Task>)_blockingTaskQueue)
.Where(t => t != null).Count();
}
}
/// <summary>Gets the maximum concurrency level to use when processing tasks.</summary>
public override int MaximumConcurrencyLevel
{
get { return _concurrencyLevel; }
}
/// <summary>Initiates shutdown of the scheduler.</summary>
public void Dispose()
{
_disposeCancellation.Cancel();
}
/// <summary>The dispatch loop run by all threads in this scheduler.</summary>
/// <param name="threadInit">An initialization routine to run when the thread begins.</param>
/// <param name="threadFinally">A finalization routine to run before the thread ends.</param>
void ThreadBasedDispatchLoop(Action threadInit, Action threadFinally)
{
_taskProcessingThread.Value = true;
if (threadInit != null)
threadInit();
try
{
// If the scheduler is disposed, the cancellation token will be set and
// we'll receive an OperationCanceledException. That OCE should not crash the process.
try
{
// If a thread abort occurs, we'll try to reset it and continue running.
while (true)
{
try
{
// For each task queued to the scheduler, try to execute it.
foreach (Task task in _blockingTaskQueue.GetConsumingEnumerable(_disposeCancellation.Token))
{
// If the task is not null, that means it was queued to this scheduler directly.
// Run it.
if (task != null)
TryExecuteTask(task);
// If the task is null, that means it's just a placeholder for a task
// queued to one of the subschedulers. Find the next task based on
// priority and fairness and run it.
else
{
// Find the next task based on our ordering rules...
Task targetTask;
QueuedTaskSchedulerQueue queueForTargetTask;
lock (_queueGroups)
FindNextTaskNeedsLock(out targetTask, out queueForTargetTask);
// ... and if we found one, run it
if (targetTask != null)
queueForTargetTask.ExecuteTask(targetTask);
}
}
}
catch (ThreadAbortException)
{
// If we received a thread abort, and that thread abort was due to shutting down
// or unloading, let it pass through. Otherwise, reset the abort so we can
// continue processing work items.
if (!Environment.HasShutdownStarted && !AppDomain.CurrentDomain.IsFinalizingForUnload())
Thread.ResetAbort();
}
}
}
catch (OperationCanceledException)
{
}
}
finally
{
// Run a cleanup routine if there was one
if (threadFinally != null)
threadFinally();
_taskProcessingThread.Value = false;
}
}
/// <summary>Find the next task that should be executed, based on priorities and fairness and the like.</summary>
/// <param name="targetTask">The found task, or null if none was found.</param>
/// <param name="queueForTargetTask">
/// The scheduler associated with the found task. Due to security checks inside of TPL,
/// this scheduler needs to be used to execute that task.
/// </param>
void FindNextTaskNeedsLock(out Task targetTask, out QueuedTaskSchedulerQueue queueForTargetTask)
{
targetTask = null;
queueForTargetTask = null;
// Look through each of our queue groups in sorted order.
// This ordering is based on the priority of the queues.
foreach (var queueGroup in _queueGroups)
{
QueueGroup queues = queueGroup.Value;
// Within each group, iterate through the queues in a round-robin
// fashion. Every time we iterate again and successfully find a task,
// we'll start in the next location in the group.
foreach (int i in queues.CreateSearchOrder())
{
queueForTargetTask = queues[i];
Queue<Task> items = queueForTargetTask._workItems;
if (items.Count > 0)
{
targetTask = items.Dequeue();
if (queueForTargetTask._disposed && items.Count == 0)
RemoveQueueNeedsLock(queueForTargetTask);
queues.NextQueueIndex = (queues.NextQueueIndex + 1) % queueGroup.Value.Count;
return;
}
}
}
}
/// <summary>Queues a task to the scheduler.</summary>
/// <param name="task">The task to be queued.</param>
protected override void QueueTask(Task task)
{
// If we've been disposed, no one should be queueing
if (_disposeCancellation.IsCancellationRequested)
throw new ObjectDisposedException(GetType().Name);
// If the target scheduler is null (meaning we're using our own threads),
// add the task to the blocking queue
if (_targetScheduler == null)
_blockingTaskQueue.Add(task);
// Otherwise, add the task to the non-blocking queue,
// and if there isn't already an executing processing task,
// start one up
else
{
// Queue the task and check whether we should launch a processing
// task (noting it if we do, so that other threads don't result
// in queueing up too many).
bool launchTask = false;
lock (_nonthreadsafeTaskQueue)
{
_nonthreadsafeTaskQueue.Enqueue(task);
if (_delegatesQueuedOrRunning < _concurrencyLevel)
{
++_delegatesQueuedOrRunning;
launchTask = true;
}
}
// If necessary, start processing asynchronously
if (launchTask)
{
Task.Factory.StartNew(ProcessPrioritizedAndBatchedTasks,
CancellationToken.None, TaskCreationOptions.None, _targetScheduler);
}
}
}
/// <summary>
/// Process tasks one at a time in the best order.
/// This should be run in a Task generated by QueueTask.
/// It's been separated out into its own method to show up better in Parallel Tasks.
/// </summary>
void ProcessPrioritizedAndBatchedTasks()
{
bool continueProcessing = true;
while (!_disposeCancellation.IsCancellationRequested && continueProcessing)
{
try
{
// Note that we're processing tasks on this thread
_taskProcessingThread.Value = true;
// Until there are no more tasks to process
while (!_disposeCancellation.IsCancellationRequested)
{
// Try to get the next task. If there aren't any more, we're done.
Task targetTask;
lock (_nonthreadsafeTaskQueue)
{
if (_nonthreadsafeTaskQueue.Count == 0)
break;
targetTask = _nonthreadsafeTaskQueue.Dequeue();
}
// If the task is null, it's a placeholder for a task in the round-robin queues.
// Find the next one that should be processed.
QueuedTaskSchedulerQueue queueForTargetTask = null;
if (targetTask == null)
{
lock (_queueGroups)
FindNextTaskNeedsLock(out targetTask, out queueForTargetTask);
}
// Now if we finally have a task, run it. If the task
// was associated with one of the round-robin schedulers, we need to use it
// as a thunk to execute its task.
if (targetTask != null)
{
if (queueForTargetTask != null)
queueForTargetTask.ExecuteTask(targetTask);
else
TryExecuteTask(targetTask);
}
}
}
finally
{
// Now that we think we're done, verify that there really is
// no more work to do. If there's not, highlight
// that we're now less parallel than we were a moment ago.
lock (_nonthreadsafeTaskQueue)
{
if (_nonthreadsafeTaskQueue.Count == 0)
{
_delegatesQueuedOrRunning--;
continueProcessing = false;
_taskProcessingThread.Value = false;
}
}
}
}
}
/// <summary>Notifies the pool that there's a new item to be executed in one of the round-robin queues.</summary>
void NotifyNewWorkItem()
{
QueueTask(null);
}
/// <summary>Tries to execute a task synchronously on the current thread.</summary>
/// <param name="task">The task to execute.</param>
/// <param name="taskWasPreviouslyQueued">Whether the task was previously queued.</param>
/// <returns>true if the task was executed; otherwise, false.</returns>
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
// If we're already running tasks on this threads, enable inlining
return _taskProcessingThread.Value && TryExecuteTask(task);
}
/// <summary>Gets the tasks scheduled to this scheduler.</summary>
/// <returns>An enumerable of all tasks queued to this scheduler.</returns>
/// <remarks>This does not include the tasks on sub-schedulers. Those will be retrieved by the debugger separately.</remarks>
protected override IEnumerable<Task> GetScheduledTasks()
{
// If we're running on our own threads, get the tasks from the blocking queue...
if (_targetScheduler == null)
{
// Get all of the tasks, filtering out nulls, which are just placeholders
// for tasks in other sub-schedulers
return _blockingTaskQueue.Where(t => t != null).ToList();
}
// otherwise get them from the non-blocking queue...
return _nonthreadsafeTaskQueue.Where(t => t != null).ToList();
}
/// <summary>Creates and activates a new scheduling queue for this scheduler.</summary>
/// <returns>The newly created and activated queue at priority 0.</returns>
public TaskScheduler ActivateNewQueue()
{
return ActivateNewQueue(0);
}
/// <summary>Creates and activates a new scheduling queue for this scheduler.</summary>
/// <param name="priority">The priority level for the new queue.</param>
/// <returns>The newly created and activated queue at the specified priority.</returns>
public TaskScheduler ActivateNewQueue(int priority)
{
// Create the queue
var createdQueue = new QueuedTaskSchedulerQueue(priority, this);
// Add the queue to the appropriate queue group based on priority
lock (_queueGroups)
{
QueueGroup list;
if (!_queueGroups.TryGetValue(priority, out list))
{
list = new QueueGroup();
_queueGroups.Add(priority, list);
}
list.Add(createdQueue);
}
// Hand the new queue back
return createdQueue;
}
/// <summary>Removes a scheduler from the group.</summary>
/// <param name="queue">The scheduler to be removed.</param>
void RemoveQueueNeedsLock(QueuedTaskSchedulerQueue queue)
{
// Find the group that contains the queue and the queue's index within the group
QueueGroup queueGroup = _queueGroups[queue._priority];
int index = queueGroup.IndexOf(queue);
// We're about to remove the queue, so adjust the index of the next
// round-robin starting location if it'll be affected by the removal
if (queueGroup.NextQueueIndex >= index)
queueGroup.NextQueueIndex--;
// Remove it
queueGroup.RemoveAt(index);
}
/// <summary>A group of queues a the same priority level.</summary>
class QueueGroup : List<QueuedTaskSchedulerQueue>
{
/// <summary>The starting index for the next round-robin traversal.</summary>
public int NextQueueIndex;
/// <summary>Creates a search order through this group.</summary>
/// <returns>An enumerable of indices for this group.</returns>
public IEnumerable<int> CreateSearchOrder()
{
for (int i = NextQueueIndex; i < Count; i++)
yield return i;
for (int i = 0; i < NextQueueIndex; i++)
yield return i;
}
}
/// <summary>Debug view for the QueuedTaskScheduler.</summary>
class QueuedTaskSchedulerDebugView
{
/// <summary>The scheduler.</summary>
readonly QueuedTaskScheduler _scheduler;
/// <summary>Initializes the debug view.</summary>
/// <param name="scheduler">The scheduler.</param>
public QueuedTaskSchedulerDebugView(QueuedTaskScheduler scheduler)
{
if (scheduler == null)
throw new ArgumentNullException("scheduler");
_scheduler = scheduler;
}
/// <summary>Gets all of the Tasks queued to the scheduler directly.</summary>
public IEnumerable<Task> ScheduledTasks
{
get
{
IEnumerable<Task> tasks = (_scheduler._targetScheduler != null)
? _scheduler._nonthreadsafeTaskQueue
: (IEnumerable<Task>)_scheduler._blockingTaskQueue;
return tasks.Where(t => t != null).ToList();
}
}
/// <summary>Gets the prioritized and fair queues.</summary>
public IEnumerable<TaskScheduler> Queues
{
get
{
var queues = new List<TaskScheduler>();
foreach (var group in _scheduler._queueGroups)
queues.AddRange(group.Value);
return queues;
}
}
}
/// <summary>Provides a scheduling queue associatd with a QueuedTaskScheduler.</summary>
[DebuggerDisplay("QueuePriority = {_priority}, WaitingTasks = {WaitingTasks}"), DebuggerTypeProxy(typeof(QueuedTaskSchedulerQueueDebugView))]
sealed class QueuedTaskSchedulerQueue : TaskScheduler,
IDisposable
{
/// <summary>The scheduler with which this pool is associated.</summary>
readonly QueuedTaskScheduler _pool;
/// <summary>Gets the priority for this queue.</summary>
internal readonly int _priority;
/// <summary>The work items stored in this queue.</summary>
internal readonly Queue<Task> _workItems;
/// <summary>Whether this queue has been disposed.</summary>
internal bool _disposed;
/// <summary>Initializes the queue.</summary>
/// <param name="priority">The priority associated with this queue.</param>
/// <param name="pool">The scheduler with which this queue is associated.</param>
internal QueuedTaskSchedulerQueue(int priority, QueuedTaskScheduler pool)
{
_priority = priority;
_pool = pool;
_workItems = new Queue<Task>();
}
/// <summary>Gets the number of tasks waiting in this scheduler.</summary>
internal int WaitingTasks
{
get { return _workItems.Count; }
}
/// <summary>Gets the maximum concurrency level to use when processing tasks.</summary>
public override int MaximumConcurrencyLevel
{
get { return _pool.MaximumConcurrencyLevel; }
}
/// <summary>Signals that the queue should be removed from the scheduler as soon as the queue is empty.</summary>
public void Dispose()
{
if (!_disposed)
{
lock (_pool._queueGroups)
{
// We only remove the queue if it's empty. If it's not empty,
// we still mark it as disposed, and the associated QueuedTaskScheduler
// will remove the queue when its count hits 0 and its _disposed is true.
if (_workItems.Count == 0)
_pool.RemoveQueueNeedsLock(this);
}
_disposed = true;
}
}
/// <summary>Gets the tasks scheduled to this scheduler.</summary>
/// <returns>An enumerable of all tasks queued to this scheduler.</returns>
protected override IEnumerable<Task> GetScheduledTasks()
{
return _workItems.ToList();
}
/// <summary>Queues a task to the scheduler.</summary>
/// <param name="task">The task to be queued.</param>
protected override void QueueTask(Task task)
{
if (_disposed)
throw new ObjectDisposedException(GetType().Name);
// Queue up the task locally to this queue, and then notify
// the parent scheduler that there's work available
lock (_pool._queueGroups)
_workItems.Enqueue(task);
_pool.NotifyNewWorkItem();
}
/// <summary>Tries to execute a task synchronously on the current thread.</summary>
/// <param name="task">The task to execute.</param>
/// <param name="taskWasPreviouslyQueued">Whether the task was previously queued.</param>
/// <returns>true if the task was executed; otherwise, false.</returns>
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
// If we're using our own threads and if this is being called from one of them,
// or if we're currently processing another task on this thread, try running it inline.
return _taskProcessingThread.Value && TryExecuteTask(task);
}
/// <summary>Runs the specified ask.</summary>
/// <param name="task">The task to execute.</param>
internal void ExecuteTask(Task task)
{
TryExecuteTask(task);
}
/// <summary>A debug view for the queue.</summary>
sealed class QueuedTaskSchedulerQueueDebugView
{
/// <summary>The queue.</summary>
readonly QueuedTaskSchedulerQueue _queue;
/// <summary>Initializes the debug view.</summary>
/// <param name="queue">The queue to be debugged.</param>
public QueuedTaskSchedulerQueueDebugView(QueuedTaskSchedulerQueue queue)
{
if (queue == null)
throw new ArgumentNullException("queue");
_queue = queue;
}
/// <summary>Gets the priority of this queue in its associated scheduler.</summary>
public int Priority
{
get { return _queue._priority; }
}
/// <summary>Gets the ID of this scheduler.</summary>
public int Id
{
get { return _queue.Id; }
}
/// <summary>Gets all of the tasks scheduled to this queue.</summary>
public IEnumerable<Task> ScheduledTasks
{
get { return _queue.GetScheduledTasks(); }
}
/// <summary>Gets the QueuedTaskScheduler with which this queue is associated.</summary>
public QueuedTaskScheduler AssociatedScheduler
{
get { return _queue._pool; }
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment