Skip to content

Instantly share code, notes, and snippets.

@kellypleahy
Created May 6, 2011 22:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kellypleahy/959904 to your computer and use it in GitHub Desktop.
Save kellypleahy/959904 to your computer and use it in GitHub Desktop.
thread pool implementation from the advanced threading class
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
namespace ThreadingApp
{
public interface ITask
{
void Execute();
}
public class ThreadSafeTaskQueue
{
private readonly Queue<ITask> _tasks = new Queue<ITask>();
public void Put(ITask task)
{
lock(_tasks)
_tasks.Enqueue(task);
}
public ITask Get()
{
if (_tasks.Count == 0)
return null;
lock (_tasks)
return _tasks.Count == 0
? null
: _tasks.Dequeue();
}
}
public class Worker
{
private readonly ThreadSafeTaskQueue _taskQueue;
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly Thread _thread;
public Worker(ThreadSafeTaskQueue taskQueue)
{
_taskQueue = taskQueue;
_cancellationTokenSource = new CancellationTokenSource();
_thread = new Thread(Execute);
}
private void Execute()
{
while(!_cancellationTokenSource.IsCancellationRequested)
{
var task = _taskQueue.Get();
if(task == null)
Thread.Sleep(100);
else
task.Execute();
}
}
public Worker Start()
{
_thread.Start();
return this;
}
public Worker StartShutdown()
{
_cancellationTokenSource.Cancel();
return this;
}
public Worker WaitForShutdown()
{
_thread.Join();
Console.WriteLine("Shut down thread {0}", _thread.ManagedThreadId);
return this;
}
}
public class ThreadPool
{
private readonly List<Worker> _workers = new List<Worker>();
private readonly ThreadSafeTaskQueue _taskQueue = new ThreadSafeTaskQueue();
public ThreadPool(int threads)
{
_workers.AddRange(
Enumerable.Range(0, threads)
.Select(i => new Worker(_taskQueue)));
foreach(var w in _workers)
w.Start();
}
public void AddTask(ITask task)
{
_taskQueue.Put(task);
}
public void Shutdown()
{
foreach (var w in _workers)
w.StartShutdown();
foreach(var w in _workers)
w.WaitForShutdown();
}
public void Resize(int newSize)
{
List<Worker> shuttingDownWorkers = null;
lock(_workers)
{
Console.WriteLine("Changing pool size from {0} to {1}.", _workers.Count, newSize);
while (_workers.Count < newSize)
_workers.Add(new Worker(_taskQueue).Start());
if (_workers.Count <= newSize)
return;
shuttingDownWorkers = _workers.Skip(newSize).ToList();
_workers.RemoveRange(newSize, shuttingDownWorkers.Count);
}
foreach (var w in shuttingDownWorkers)
w.StartShutdown();
foreach (var w in shuttingDownWorkers)
w.WaitForShutdown();
}
}
public class Holder
{
public int Value;
}
public class SampleTask : ITask
{
private readonly Holder _holder;
public SampleTask(Holder holder)
{
_holder = holder;
}
public void Execute()
{
lock (_holder)
{
var result = ++_holder.Value;
Console.WriteLine(result + " from " + Thread.CurrentThread.ManagedThreadId);
}
}
}
class Program
{
static void Main(string[] args)
{
var pool = new ThreadPool(10);
var holder = new Holder();
var cancellationSource = new CancellationTokenSource();
new Thread(() => EnqueueWork(cancellationSource, pool, holder)).Start();
while (true)
{
var key = Console.ReadKey();
if (key.Key == ConsoleKey.Enter)
{
Console.WriteLine("Cancel requested.");
cancellationSource.Cancel();
pool.Shutdown();
}
else if(key.KeyChar >= '0' && key.KeyChar <= '9')
{
var numThreads = key.KeyChar - '0';
if (numThreads == 0)
numThreads = 10;
pool.Resize(numThreads);
}
}
}
private static void EnqueueWork(CancellationTokenSource cancellationSource, ThreadPool pool, Holder holder)
{
while(!cancellationSource.IsCancellationRequested)
{
pool.AddTask(new SampleTask(holder));
Thread.Sleep(25);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment