Skip to content

Instantly share code, notes, and snippets.

@Rottweiler
Forked from pigeonhands/ThreadQueue.cs
Created January 18, 2017 20:21
Show Gist options
  • Save Rottweiler/63f9a2e4f83383a8e867ce889b60f954 to your computer and use it in GitHub Desktop.
Save Rottweiler/63f9a2e4f83383a8e867ce889b60f954 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
public delegate void ThreadQueueWorkDelegate<T>(T task);
/// <summary>
/// ThreadQueue
/// Made by BahNahNah
/// </summary>
public class ThreadQueue<T> : IDisposable
{
private Thread[] threads = null;
private Queue<object> WorkQueue = null;
private bool RunNewTasks = true;
/// <summary>
/// Callback to process items
/// </summary>
public event ThreadQueueWorkDelegate<T> Work;
/// <summary>
/// Indicates the status of the backround threads
/// </summary>
public bool ThreadsRunning { get; private set; }
/// <summary>
/// Number of threads in the current ThreadQue
/// </summary>
public int Threads { get; private set; }
#region " Constructors "
/// <summary>
/// Creates a new instance of ThreadQue
/// </summary>
/// <param name="threadAmmount">Number of threads</param>
/// <param name="startThreads">If false, call <see cref="StartThreads"/> function to start.</param>
public ThreadQueue(int threadAmmount, bool startThreads)
{
Threads = threadAmmount;
ThreadsRunning = false;
threads = new Thread[threadAmmount];
WorkQueue = new Queue<object>();
if(startThreads)
StartThreads();
}
/// <summary>
/// Creates a new instance of ThreadQue. Workers are not started automaticly. call <see cref="StartThreads"/> function to start them.
/// </summary>
/// <param name="threadAmmount">Number of threads</param>
/// <param name="items">Initial values to be passed to the work que</param>
public ThreadQueue(int threadAmmount, T[] items)
{
Threads = threadAmmount;
ThreadsRunning = false;
threads = new Thread[threadAmmount];
WorkQueue = new Queue<object>(items.Length);
foreach (T i in items)
WorkQueue.Enqueue(i);
}
/// <summary>
/// Creates a new instance of ThreadQueue. Workers are not started automaticly. call <see cref="StartThreads"/> function to start them.
/// </summary>
/// <param name="items">Initial values to be passed to the work que</param>
public ThreadQueue(T[] items) : this(Environment.ProcessorCount, items)
{
}
/// <summary>
/// Creates a new instance of ThreadQueue with default values.
/// </summary>
public ThreadQueue() : this(Environment.ProcessorCount, true)
{
}
/// <summary>
/// Creates a new instance of ThreadQueue.
/// </summary>
/// <param name="threads">Number of workers</param>
public ThreadQueue(int threads) : this(threads, true)
{
}
#endregion
#region " Public Functions "
/// <summary>
/// Waits for tasks to compete
/// </summary>
/// <param name="fullCompletion">If true, all running and qued tasks will be completed. If false, only running threads will be completed and queue will be cleared.</param>
/// <param name="newThreadCount">Changes the number of threads working</param>
public void WaitForCompletion(bool fullCompletion, int newThreadCount)
{
RunNewTasks = fullCompletion;
lock (WorkQueue)
{
foreach (Thread t in threads)
WorkQueue.Enqueue(null);
Monitor.PulseAll(WorkQueue);
}
foreach (Thread t in threads)
t?.Join();
if (!fullCompletion)
{
lock (WorkQueue)
{
WorkQueue.Clear();
}
}
if (newThreadCount != Threads)
threads = new Thread[newThreadCount];
Threads = newThreadCount;
StartThreadsUnsafe();
}
/// <summary>
/// Waits for tasks to compete
/// </summary>
/// <param name="fullCompletion">If true, all running and qued tasks will be completed. If false, only running threads will be completed and queue will be cleared.</param>
public void WaitForCompletion(bool fullCompletion)
{
WaitForCompletion(fullCompletion, Threads);
}
/// <summary>
/// Waits for tasks to compete
/// </summary>
/// <param name="fullCompletion">If true, all running and qued tasks will be completed. If false, only running threads will be completed and queue will be cleared.</param>
/// <param name="newThreadCount">Changes the number of threads working</param>
public async Task WaitForCompletionAsync(bool fullCompletion, int newThreadCount)
{
await Task.Factory.StartNew(() => WaitForCompletion(fullCompletion, newThreadCount));
}
/// <summary>
/// Waits for tasks to compete
/// </summary>
/// <param name="fullCompletion">If true, all running and qued tasks will be completed. If false, only running threads will be completed and queue will be cleared.</param>
public async Task WaitForCompletionAsync(bool fullCompletion)
{
await WaitForCompletionAsync(fullCompletion, Threads);
}
/// <summary>
/// Adds a new item to the queue. NULL values will be ignored.
/// </summary>
/// <param name="o">Item to add to the queue.</param>
public void Add(T o)
{
if (o == null)
return;
lock(WorkQueue)
{
WorkQueue.Enqueue(o);
Monitor.Pulse(WorkQueue);
}
}
/// <summary>
/// If not alredy running, will start the workers.
/// </summary>
public void StartThreads()
{
if (ThreadsRunning)
return;
StartThreadsUnsafe();
}
public void Dispose()
{
WaitForCompletion(false);
threads = null;
WorkQueue = null;
}
#endregion
#region " Private Functions "
private void StartThreadsUnsafe()
{
RunNewTasks = true;
ThreadsRunning = true;
for (int i = 0; i < threads.Length; i++)
{
threads[i] = new Thread(ManageQue);
threads[i].Start();
}
}
private void ManageQue()
{
while(true)
{
object passObject = null;
lock(WorkQueue)
{
while (WorkQueue.Count < 1)
Monitor.Wait(WorkQueue);
passObject = WorkQueue.Dequeue();
}
if (passObject == null)
break;
Work?.Invoke((T)passObject);
if (!RunNewTasks)
break;
}
}
#endregion
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment