Skip to content

Instantly share code, notes, and snippets.

@george-polevoy
Last active June 3, 2016 18:13
Show Gist options
  • Save george-polevoy/9d9c356f721344d429e17ee94428feb9 to your computer and use it in GitHub Desktop.
Save george-polevoy/9d9c356f721344d429e17ee94428feb9 to your computer and use it in GitHub Desktop.
Toy implementation of fixed thread pool
void Main()
{
long countDone = 0;
using (var pool = new FixedThreadPool(16))
{
pool.WaitForThreadInitialization();
var totalDepth = 9;
var total = (2L << totalDepth) - 1;
Console.WriteLine("Total to be scheduled: {0}", total);
var countdown = new CountdownEvent((int)total);
var sw = Stopwatch.StartNew();
ParameterizedThreadStart recursiveProcess = null;
recursiveProcess = o =>
{
var depth = (long)o;
Interlocked.Increment(ref countDone);
countdown.Signal();
GC.KeepAlive(Enumerable.Range(0, 1000000).Select(i => (long)i).Sum());
if (depth == 0)
{
return;
}
pool.Schedule(recursiveProcess, depth - 1);
pool.Schedule(recursiveProcess, depth - 1);
};
pool.Schedule(recursiveProcess, (long)totalDepth);
countdown.Wait();
sw.Stop();
Console.WriteLine(sw.Elapsed);
Console.WriteLine("Locally: {0} Total {1}", pool.ProcessedLocally, pool.ProcessedTotal);
}
Console.WriteLine("Done: {0}", countDone);
}
public class Job
{
ParameterizedThreadStart proc;
object argument;
public Job(ParameterizedThreadStart proc, object argument)
{
this.proc = proc;
this.argument = argument;
}
public void Execute()
{
var p = proc;
var a = argument;
proc = null;
argument = null;
p(a);
}
}
public class ThreadSlot
{
static readonly ThreadLocal<ThreadSlot> localThreadSlot = new ThreadLocal<ThreadSlot>();
readonly Thread thread;
readonly FixedThreadPool pool;
volatile bool stopRequested;
readonly ConcurrentQueue<Job> jobs
= new ConcurrentQueue<Job>();
public static ThreadSlot GetLocalThreadSlot()
{
return localThreadSlot.Value;
}
public ThreadSlot(FixedThreadPool pool, Thread thread)
{
this.pool = pool;
this.thread = thread;
}
public void Initialize()
{
localThreadSlot.Value = this;
pool.SignalOneInitialized();
}
public void Start()
{
thread.Start(this);
}
public int GetPendingCount()
{
return jobs.Count;
}
public Job TryDequeue()
{
Job token;
if (jobs.TryDequeue(out token))
{
pool.DecrementTotal();
}
return token;
}
public void RequestStop()
{
stopRequested = true;
}
public void Schedule(ParameterizedThreadStart jobProc, object state)
{
jobs.Enqueue(new Job(jobProc, state));
pool.IncrementTotal();
}
public bool StopRequested()
{
return stopRequested;
}
public void Process()
{
while (!StopRequested())
{
var job = TryDequeue();
if (job != null)
{
pool.IncProcessedLocally();
pool.IncProcessedTotal();
job.Execute();
}
else
{
job = pool.WaitForJob();
if (job != null)
{
pool.IncProcessedTotal();
job.Execute();
}
pool.AllowMoreProcessing();
}
}
}
}
public class FixedThreadPool : IDisposable
{
List<ThreadSlot> threads;
private long disposed;
private long total;
private long totalRunning;
SemaphoreSlim semaphore;
long processedLocally;
long processedTotal;
public void IncProcessedLocally()
{
Interlocked.Increment(ref processedLocally);
}
public void IncProcessedTotal()
{
Interlocked.Increment(ref processedTotal);
}
public long ProcessedLocally
{
get { return processedLocally; }
}
public long ProcessedTotal
{
get { return processedTotal; }
}
public FixedThreadPool(int numThreads)
{
threads = new List<ThreadSlot>(numThreads);
semaphore = new SemaphoreSlim(0, numThreads);
for (var i = 0; i < numThreads; i++)
{
var t = new Thread(ThreadProc);
t.IsBackground = true;
var slot = new ThreadSlot(this, t);
threads.Add(slot);
}
intitializationCountdown = new CountdownEvent(numThreads);
currentScheduler = ScheduleUninitialized;
for (var i = 0; i < numThreads; i++)
{
threads[i].Start();
}
}
public void IncrementTotal()
{
Interlocked.Increment(ref total);
AllowMoreProcessing();
}
public void AllowMoreProcessing()
{
var currentRunning = Interlocked.Increment(ref totalRunning);
if (currentRunning < threads.Count + 1 && Interlocked.Read(ref total) > 0)
{
semaphore.Release();
}
else
{
Interlocked.Decrement(ref totalRunning);
}
}
public void DecrementTotal()
{
var current = Interlocked.Decrement(ref total);
if (current > 0) AllowMoreProcessing();
}
private void ScheduleInitialized(ParameterizedThreadStart threadProc, object state)
{
ThreadSlot best = null;
var local = ThreadSlot.GetLocalThreadSlot();
if (local != null)
{
best = local;
}
if (best == null)
{
best = threads[0];
}
best.Schedule(threadProc, state);
}
private volatile Action<ParameterizedThreadStart,object> currentScheduler;
private CountdownEvent intitializationCountdown;
private void ScheduleUninitialized(ParameterizedThreadStart threadProc, object state)
{
Console.WriteLine("Scheduling as uninitialized");
intitializationCountdown.Wait();
currentScheduler = ScheduleInitialized;
currentScheduler(threadProc, state);
}
public void Schedule(ParameterizedThreadStart threadProc, object state)
{
currentScheduler(threadProc, state);
}
internal void SignalOneInitialized()
{
intitializationCountdown.Signal();
}
public void WaitForThreadInitialization()
{
intitializationCountdown.Wait();
}
public Job WaitForJob()
{
semaphore.Wait();
Job found = null;
try
{
while (Interlocked.Read(ref total) > 0)
{
for (var i = 0; i < threads.Count; i++)
{
found = threads[i].TryDequeue();
if (found != null)
{
return found;
}
}
}
}
finally
{
Interlocked.Decrement(ref totalRunning);
}
return found;
}
static void ThreadProc(object state)
{
var slot = (ThreadSlot)state;
slot.Initialize();
slot.Process();
}
public void Dispose()
{
Dispose(true);
}
~FixedThreadPool()
{
Dispose(false);
}
private void Dispose(bool disposing)
{
if (Interlocked.CompareExchange(ref disposed, 1, 0) == 1)
return;
for (var i = 0; i < threads.Count; i++)
{
threads[i].RequestStop();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment