Last active
June 3, 2016 18:13
-
-
Save george-polevoy/9d9c356f721344d429e17ee94428feb9 to your computer and use it in GitHub Desktop.
Toy implementation of fixed thread pool
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
void Main() | |
{ | |
long countDone = 0; | |
using (var pool = new FixedThreadPool(16)) | |
{ | |
pool.WaitForThreadInitialization(); | |
var totalDepth = 9; | |
var total = (2L << totalDepth) - 1; | |
Console.WriteLine("Total to be scheduled: {0}", total); | |
var countdown = new CountdownEvent((int)total); | |
var sw = Stopwatch.StartNew(); | |
ParameterizedThreadStart recursiveProcess = null; | |
recursiveProcess = o => | |
{ | |
var depth = (long)o; | |
Interlocked.Increment(ref countDone); | |
countdown.Signal(); | |
GC.KeepAlive(Enumerable.Range(0, 1000000).Select(i => (long)i).Sum()); | |
if (depth == 0) | |
{ | |
return; | |
} | |
pool.Schedule(recursiveProcess, depth - 1); | |
pool.Schedule(recursiveProcess, depth - 1); | |
}; | |
pool.Schedule(recursiveProcess, (long)totalDepth); | |
countdown.Wait(); | |
sw.Stop(); | |
Console.WriteLine(sw.Elapsed); | |
Console.WriteLine("Locally: {0} Total {1}", pool.ProcessedLocally, pool.ProcessedTotal); | |
} | |
Console.WriteLine("Done: {0}", countDone); | |
} | |
public class Job | |
{ | |
ParameterizedThreadStart proc; | |
object argument; | |
public Job(ParameterizedThreadStart proc, object argument) | |
{ | |
this.proc = proc; | |
this.argument = argument; | |
} | |
public void Execute() | |
{ | |
var p = proc; | |
var a = argument; | |
proc = null; | |
argument = null; | |
p(a); | |
} | |
} | |
public class ThreadSlot | |
{ | |
static readonly ThreadLocal<ThreadSlot> localThreadSlot = new ThreadLocal<ThreadSlot>(); | |
readonly Thread thread; | |
readonly FixedThreadPool pool; | |
volatile bool stopRequested; | |
readonly ConcurrentQueue<Job> jobs | |
= new ConcurrentQueue<Job>(); | |
public static ThreadSlot GetLocalThreadSlot() | |
{ | |
return localThreadSlot.Value; | |
} | |
public ThreadSlot(FixedThreadPool pool, Thread thread) | |
{ | |
this.pool = pool; | |
this.thread = thread; | |
} | |
public void Initialize() | |
{ | |
localThreadSlot.Value = this; | |
pool.SignalOneInitialized(); | |
} | |
public void Start() | |
{ | |
thread.Start(this); | |
} | |
public int GetPendingCount() | |
{ | |
return jobs.Count; | |
} | |
public Job TryDequeue() | |
{ | |
Job token; | |
if (jobs.TryDequeue(out token)) | |
{ | |
pool.DecrementTotal(); | |
} | |
return token; | |
} | |
public void RequestStop() | |
{ | |
stopRequested = true; | |
} | |
public void Schedule(ParameterizedThreadStart jobProc, object state) | |
{ | |
jobs.Enqueue(new Job(jobProc, state)); | |
pool.IncrementTotal(); | |
} | |
public bool StopRequested() | |
{ | |
return stopRequested; | |
} | |
public void Process() | |
{ | |
while (!StopRequested()) | |
{ | |
var job = TryDequeue(); | |
if (job != null) | |
{ | |
pool.IncProcessedLocally(); | |
pool.IncProcessedTotal(); | |
job.Execute(); | |
} | |
else | |
{ | |
job = pool.WaitForJob(); | |
if (job != null) | |
{ | |
pool.IncProcessedTotal(); | |
job.Execute(); | |
} | |
pool.AllowMoreProcessing(); | |
} | |
} | |
} | |
} | |
public class FixedThreadPool : IDisposable | |
{ | |
List<ThreadSlot> threads; | |
private long disposed; | |
private long total; | |
private long totalRunning; | |
SemaphoreSlim semaphore; | |
long processedLocally; | |
long processedTotal; | |
public void IncProcessedLocally() | |
{ | |
Interlocked.Increment(ref processedLocally); | |
} | |
public void IncProcessedTotal() | |
{ | |
Interlocked.Increment(ref processedTotal); | |
} | |
public long ProcessedLocally | |
{ | |
get { return processedLocally; } | |
} | |
public long ProcessedTotal | |
{ | |
get { return processedTotal; } | |
} | |
public FixedThreadPool(int numThreads) | |
{ | |
threads = new List<ThreadSlot>(numThreads); | |
semaphore = new SemaphoreSlim(0, numThreads); | |
for (var i = 0; i < numThreads; i++) | |
{ | |
var t = new Thread(ThreadProc); | |
t.IsBackground = true; | |
var slot = new ThreadSlot(this, t); | |
threads.Add(slot); | |
} | |
intitializationCountdown = new CountdownEvent(numThreads); | |
currentScheduler = ScheduleUninitialized; | |
for (var i = 0; i < numThreads; i++) | |
{ | |
threads[i].Start(); | |
} | |
} | |
public void IncrementTotal() | |
{ | |
Interlocked.Increment(ref total); | |
AllowMoreProcessing(); | |
} | |
public void AllowMoreProcessing() | |
{ | |
var currentRunning = Interlocked.Increment(ref totalRunning); | |
if (currentRunning < threads.Count + 1 && Interlocked.Read(ref total) > 0) | |
{ | |
semaphore.Release(); | |
} | |
else | |
{ | |
Interlocked.Decrement(ref totalRunning); | |
} | |
} | |
public void DecrementTotal() | |
{ | |
var current = Interlocked.Decrement(ref total); | |
if (current > 0) AllowMoreProcessing(); | |
} | |
private void ScheduleInitialized(ParameterizedThreadStart threadProc, object state) | |
{ | |
ThreadSlot best = null; | |
var local = ThreadSlot.GetLocalThreadSlot(); | |
if (local != null) | |
{ | |
best = local; | |
} | |
if (best == null) | |
{ | |
best = threads[0]; | |
} | |
best.Schedule(threadProc, state); | |
} | |
private volatile Action<ParameterizedThreadStart,object> currentScheduler; | |
private CountdownEvent intitializationCountdown; | |
private void ScheduleUninitialized(ParameterizedThreadStart threadProc, object state) | |
{ | |
Console.WriteLine("Scheduling as uninitialized"); | |
intitializationCountdown.Wait(); | |
currentScheduler = ScheduleInitialized; | |
currentScheduler(threadProc, state); | |
} | |
public void Schedule(ParameterizedThreadStart threadProc, object state) | |
{ | |
currentScheduler(threadProc, state); | |
} | |
internal void SignalOneInitialized() | |
{ | |
intitializationCountdown.Signal(); | |
} | |
public void WaitForThreadInitialization() | |
{ | |
intitializationCountdown.Wait(); | |
} | |
public Job WaitForJob() | |
{ | |
semaphore.Wait(); | |
Job found = null; | |
try | |
{ | |
while (Interlocked.Read(ref total) > 0) | |
{ | |
for (var i = 0; i < threads.Count; i++) | |
{ | |
found = threads[i].TryDequeue(); | |
if (found != null) | |
{ | |
return found; | |
} | |
} | |
} | |
} | |
finally | |
{ | |
Interlocked.Decrement(ref totalRunning); | |
} | |
return found; | |
} | |
static void ThreadProc(object state) | |
{ | |
var slot = (ThreadSlot)state; | |
slot.Initialize(); | |
slot.Process(); | |
} | |
public void Dispose() | |
{ | |
Dispose(true); | |
} | |
~FixedThreadPool() | |
{ | |
Dispose(false); | |
} | |
private void Dispose(bool disposing) | |
{ | |
if (Interlocked.CompareExchange(ref disposed, 1, 0) == 1) | |
return; | |
for (var i = 0; i < threads.Count; i++) | |
{ | |
threads[i].RequestStop(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment