Skip to content

Instantly share code, notes, and snippets.

@renestein
Created June 10, 2014 11:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save renestein/7698e366a157a0235b09 to your computer and use it in GitHub Desktop.
Save renestein/7698e366a157a0235b09 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace RStein.Async.Schedulers
{
public class IoServiceThreadPoolScheduler : TaskSchedulerBase
{
public const string POOL_THREAD_NAME_FORMAT = "IoServiceThreadPoolSchedulerThread#{0}";
private const int EXPECTED_MIMINUM_THREADS = 1;
private readonly IoServiceScheduler m_ioService;
private readonly Work m_ioServiceWork;
private List<Thread> m_threads;
public IoServiceThreadPoolScheduler(IoServiceScheduler ioService)
: this(ioService, Environment.ProcessorCount)
{
}
public IoServiceThreadPoolScheduler(IoServiceScheduler ioService, int numberOfThreads)
{
if (ioService == null)
{
throw new ArgumentNullException("ioService");
}
if (numberOfThreads < EXPECTED_MIMINUM_THREADS)
{
throw new ArgumentOutOfRangeException("numberOfThreads");
}
m_ioService = ioService;
m_ioServiceWork = new Work(m_ioService);
initThreads(numberOfThreads);
}
public override int MaximumConcurrencyLevel
{
get
{
checkIfDisposed();
return m_threads.Count();
}
}
public override IProxyScheduler ProxyScheduler
{
get
{
checkIfDisposed();
return base.ProxyScheduler;
}
set
{
checkIfDisposed();
m_ioService.ProxyScheduler = value;
base.ProxyScheduler = value;
}
}
public override void QueueTask(Task task)
{
checkIfDisposed();
m_ioService.QueueTask(task);
}
public override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
checkIfDisposed();
return m_ioService.TryExecuteTaskInline(task, taskWasPreviouslyQueued);
}
public override IEnumerable<Task> GetScheduledTasks()
{
checkIfDisposed();
return m_ioService.GetScheduledTasks();
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
m_ioServiceWork.Dispose();
m_threads.ForEach(thread => thread.Join());
m_ioService.Dispose();
}
}
private void initThreads(int numberOfThreads)
{
m_threads = Enumerable.Range(0, numberOfThreads)
.Select(threadNumber =>
{
var poolThread = new Thread(() =>
{
try
{
m_ioService.Run();
}
catch (Exception ex)
{
Trace.WriteLine(ex);
if (Debugger.IsAttached)
{
Debugger.Break();
}
else
{
Environment.FailFast(null, ex);
}
}
});
poolThread.IsBackground = true;
poolThread.Name = String.Format(POOL_THREAD_NAME_FORMAT, threadNumber);
return poolThread;
}).ToList();
m_threads.ForEach(thread => thread.Start());
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment