Skip to content

Instantly share code, notes, and snippets.

@to11mtm
Created December 5, 2020 18:10
Show Gist options
  • Save to11mtm/d5a7532e0e0d93b7392f7e9207ac0cee to your computer and use it in GitHub Desktop.
Save to11mtm/d5a7532e0e0d93b7392f7e9207ac0cee to your computer and use it in GitHub Desktop.
Thread Affinity Task Scheduler for Akka
// based heavily on https://github.com/noseratio/tpl/blob/master/ThreadAffinityTaskScheduler.cs
// //-----------------------------------------------------------------------
// // <copyright file="ThreadAffinityDispatcher.cs" company="Akka.NET Project">
// // Copyright (C) 2009-2020 Lightbend Inc. <http://www.lightbend.com>
// // Copyright (C) 2013-2020 .NET Foundation <https://github.com/akkadotnet/akka.net>
// // </copyright>
// //-----------------------------------------------------------------------
using System.Threading.Tasks;
using System.Runtime.InteropServices;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Akka.Dispatch;
using Helios.Concurrency;
using Akka.Configuration;
namespace Akka.Dispatch
{
internal class AffinityThreadPoolSettings
{
/// <summary>
/// Background threads are the default thread type
/// </summary>
public const ThreadType DefaultThreadType = ThreadType.Background;
public AffinityThreadPoolSettings(int numThreads, string name = null, TimeSpan? deadlockTimeout = null)
: this(numThreads, DefaultThreadType, name, deadlockTimeout)
{ }
public AffinityThreadPoolSettings(
int numThreads,
ThreadType threadType,
string name = null,
TimeSpan? deadlockTimeout = null)
{
Name = name ?? ("DedicatedThreadPool-" + Guid.NewGuid());
ThreadType = threadType;
NumThreads = numThreads;
DeadlockTimeout = deadlockTimeout;
if (deadlockTimeout.HasValue && deadlockTimeout.Value.TotalMilliseconds <= 0)
throw new ArgumentOutOfRangeException("deadlockTimeout", string.Format("deadlockTimeout must be null or at least 1ms. Was {0}.", deadlockTimeout));
if (numThreads <= 0)
throw new ArgumentOutOfRangeException("numThreads", string.Format("numThreads must be at least 1. Was {0}", numThreads));
}
/// <summary>
/// The total number of threads to run in this thread pool.
/// </summary>
public int NumThreads { get; private set; }
/// <summary>
/// The type of threads to run in this thread pool.
/// </summary>
public ThreadType ThreadType { get; private set; }
/// <summary>
/// Interval to check for thread deadlocks.
///
/// If a thread takes longer than <see cref="DeadlockTimeout"/> it will be aborted
/// and replaced.
/// </summary>
public TimeSpan? DeadlockTimeout { get; private set; }
/// <summary>
/// TBD
/// </summary>
public string Name { get; private set; }
/// <summary>
/// TBD
/// </summary>
public Action<Exception> ExceptionHandler { get; private set; }
/// <summary>
/// Gets the thread stack size, 0 represents the default stack size.
/// </summary>
public int ThreadMaxStackSize { get; private set; }
}
internal interface IExecuteItem
{
void Execute();
}
/// <summary>Customize a blocking wait (e.g., with a message pump)</summary>
public delegate int WaitHelperFunc(IntPtr[] waitHandles, bool waitAll, int millisecondsTimeout);
/// <summary>Customize a blocking retrieval of a queued task item</summary>
internal delegate IExecuteItem TakeFunc(BlockingCollection<IExecuteItem> items, CancellationToken token);
#region ThreadAffinityTaskScheduler
/// <summary>
/// Provides a pool of single-threaded apartments, with optional message pumping
/// Each apartment provides asynchronous continuation after `await` on the same thread,
/// </summary>
public sealed class ThreadAffinityTaskScheduler : TaskScheduler, IDisposable
{
/// <summary>Stores the queued tasks to be executed by one of our apartments.</summary>
private BlockingCollection<IExecuteItem> _items;
/// <summary>The Cancellation Token Source object to request the termination.</summary>
private readonly CancellationTokenSource _terminationCts;
/// <summary>The list of SingleThreadSynchronizationContext objects to represent apartments (threads).</summary>
private readonly List<ThreadWithAffinityContext> _threads;
internal ThreadAffinityTaskScheduler(AffinityThreadPoolSettings settings):this(settings.NumThreads, settings.ThreadType==ThreadType.Background)
{
}
/// <summary>Initializes a new instance of the ThreadAffinityTaskScheduler class with the specified concurrency level.</summary>
/// <param name="numberOfThreads">The number of threads that should be created and used by this scheduler.</param>
public ThreadAffinityTaskScheduler(int numberOfThreads, bool staThreads = false, WaitHelperFunc waitHelper = null)
{
// Validate arguments
if (numberOfThreads < 1)
throw new ArgumentOutOfRangeException("numberOfThreads");
// Initialize the tasks collection
_items = new BlockingCollection<IExecuteItem>();
// CTS to cancel task dispatching
_terminationCts = new CancellationTokenSource();
// Create the contexts (threads) to be used by this scheduler
_threads = Enumerable.Range(0, numberOfThreads).Select(i =>
new ThreadWithAffinityContext(_terminationCts.Token, staThreads, waitHelper, Take)).ToList();
}
/// <summary>
/// Take a queued task item either from ThreadAffinityTaskScheduler or ThreadWithAffinityContext,
/// blocks if both queues are empty
/// </summary>
internal IExecuteItem Take(BlockingCollection<IExecuteItem> items, CancellationToken token)
{
IExecuteItem item;
BlockingCollection<IExecuteItem>.TakeFromAny(new[] { _items, items }, out item, token);
return item;
}
/// <summary>Queues a Task to be executed by this scheduler.</summary>
/// <param name="task">The task to be executed.</param>
protected override void QueueTask(Task task)
{
// Push the task into the blocking collection of tasks
_items.Add(new TaskItem(task, this.TryExecuteTask));
}
/// <summary>Provides a list of the scheduled tasks for the debugger to consume.</summary>
/// <returns>An enumerable of all tasks currently scheduled.</returns>
protected override IEnumerable<Task> GetScheduledTasks()
{
// Serialize the contents of the blocking collection of tasks for the debugger
return _items.Select(t => ((TaskItem)t).Task).ToArray();
}
/// <summary>Determines whether a Task may be inlined.</summary>
/// <param name="task">The task to be executed.</param>
/// <param name="taskWasPreviouslyQueued">Whether the task was previously queued.</param>
/// <returns>true if the task was successfully inlined; otherwise, false.</returns>
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
//TODO: call TryExecuteTask(task) if can be done on the same thread
// not sure if that's possible to implement as we don't have an antecedent task to match the thread
return false;
}
/// <summary>Gets the maximum concurrency level supported by this scheduler.</summary>
public override int MaximumConcurrencyLevel
{
get { return _threads.Count; }
}
/// <summary>
/// Request the termination and so the cleanup.
/// This method blocks until all threads successfully shutdown.
/// </summary>
public void Dispose()
{
if (_items != null)
{
// Request the cancellation
_terminationCts.Cancel();
// Dispose each context
foreach (var context in _threads)
context.Dispose();
// Cleanup
_items.Dispose();
_items = null;
}
}
/// <summary>A handy wrapper around Task.Factory.StartNew</summary>
public Task Run(Action action, CancellationToken token)
{
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this);
}
/// <summary>A handy wrapper around Task.Factory.StartNew</summary>
public Task Run(Func<Task> action, CancellationToken token)
{
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this).Unwrap();
}
/// <summary>A handy wrapper around Task.Factory.StartNew</summary>
public Task<TResult> Run<TResult>(Func<Task<TResult>> action, CancellationToken token)
{
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this).Unwrap();
}
#region TaskItem
/// <summary>
/// A simple container to store and execute a task via IExecuteItem.Execute
/// </summary>
internal class TaskItem : IExecuteItem
{
public Task Task { get; private set; }
readonly Func<Task, bool> _executeTask;
protected TaskItem() { }
public TaskItem(Task task, Func<Task, bool> executeTask)
{
this.Task = task;
_executeTask = executeTask;
}
// IExecuteItem.Execute
public void Execute()
{
_executeTask(this.Task);
}
}
#endregion
}
#endregion
#region ThreadWithAffinityContext
/// <summary>
/// A helper Synchronization Context class to post continuations on the same thread
/// (was: SingleThreadSynchronizationContext)
/// </summary>
public class ThreadWithAffinityContext : SynchronizationContext, IDisposable
{
private BlockingCollection<IExecuteItem> _items; // pending tasks
WaitHelperFunc _waitHelper; // allows to customize the blocking wait (with message pump)
TakeFunc _takeFunc; // pump the task queue
Thread _thread; // the thread with the sync context installed on it
CancellationTokenSource _cts; // ask the thread to end
/// <summary>// TaskScheduler.FromCurrentSynchronizationContext()</summary>
public TaskScheduler Scheduler { get; private set; }
/// <summary>
/// Construct a standaline instance of ThreadWithAffinityContext
/// </summary>
public ThreadWithAffinityContext(bool staThread = true, bool pumpMessages = true) :
this(CancellationToken.None,
staThread,
pumpMessages ? WaitHelpers.WaitWithMessageLoop : (WaitHelperFunc)null,
ThreadWithAffinityContext.Take)
{
}
/// <summary>
/// Construct a standaline instance of ThreadWithAffinityContext with cancellation
/// </summary>
public ThreadWithAffinityContext(CancellationToken token, bool staThread = true, bool pumpMessages = true) :
this(token,
staThread,
pumpMessages ? WaitHelpers.WaitWithMessageLoop : (WaitHelperFunc)null,
ThreadWithAffinityContext.Take)
{
}
/// <summary>
/// Construct an instance of ThreadWithAffinityContext to used by ThreadAffinityTaskScheduler
/// </summary>
internal ThreadWithAffinityContext(CancellationToken token, bool staThread, WaitHelperFunc waitHelper, TakeFunc takeFunc)
{
_waitHelper = waitHelper;
_items = new BlockingCollection<IExecuteItem>();
_takeFunc = takeFunc;
_cts = CancellationTokenSource.CreateLinkedTokenSource(token);
if (_takeFunc == null)
_takeFunc = Take;
// this makes our override of SynchronizationContext.Wait get called
if (_waitHelper == null)
_waitHelper = WaitHelper;
else
base.SetWaitNotificationRequired();
// use TCS to return SingleThreadSynchronizationContext to the task scheduler
var tcs = new TaskCompletionSource<TaskScheduler>();
_thread = new Thread(() =>
{
// install on the current thread
SynchronizationContext.SetSynchronizationContext(this);
try
{
tcs.SetResult(TaskScheduler.FromCurrentSynchronizationContext()); // the sync. context is ready, return it to the task scheduler
// the thread's core task dispatching loop, terminate-able with token
while (true)
{
var executeItem = _takeFunc(_items, _cts.Token);
// execute a Task queued by ThreadAffinityTaskScheduler.QueueTask
// or a callback queued by SingleThreadSynchronizationContext.Post
executeItem.Execute();
};
}
catch (OperationCanceledException)
{
// ignore OperationCanceledException exceptions when terminating
if (!_cts.Token.IsCancellationRequested)
throw;
}
finally
{
SynchronizationContext.SetSynchronizationContext(null);
}
});
// make it an STA thread if message pumping is requested
if (staThread)
_thread.SetApartmentState(ApartmentState.STA);
_thread.IsBackground = true;
_thread.Start();
this.Scheduler = tcs.Task.Result;
}
public override void Post(SendOrPostCallback d, object state)
{
_items.Add(new ActionItem(() => d(state)));
}
public override void Send(SendOrPostCallback d, object state)
{
//TODO: currently we don't support (and expect) synchronous callbacks
throw new NotImplementedException();
}
public override SynchronizationContext CreateCopy()
{
// more info: http://stackoverflow.com/q/21062440
return this;
}
public override int Wait(IntPtr[] waitHandles, bool waitAll, int millisecondsTimeout)
{
// this can pump if needed
return _waitHelper(waitHandles, waitAll, millisecondsTimeout);
}
public void Dispose()
{
// cleanup
if (_thread != null)
{
// request the cancellation and wait
_cts.Cancel();
_thread.Join();
_thread = null;
}
if (_items != null)
{
_items.Dispose();
_items = null;
}
}
/// <summary>A handy wrapper around Task.Factory.StartNew</summary>
public Task Run(Action action, CancellationToken token)
{
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this.Scheduler);
}
/// <summary>A handy wrapper around Task.Factory.StartNew</summary>
public Task Run(Func<Task> action, CancellationToken token)
{
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this.Scheduler).Unwrap();
}
/// <summary>A handy wrapper around Task.Factory.StartNew</summary>
public Task<TResult> Run<TResult>(Func<Task<TResult>> action, CancellationToken token)
{
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this.Scheduler).Unwrap();
}
/// <summary>
/// Take queued task item, blocks if the queue is empty
/// </summary>
static internal IExecuteItem Take(BlockingCollection<IExecuteItem> items, CancellationToken token)
{
return items.Take(token);
}
/// <summary>
/// A simple container to store and execute an action via IExecuteItem.Execute
/// </summary>
class ActionItem : IExecuteItem
{
readonly Action _action;
protected ActionItem() { }
public ActionItem(Action action)
{
_action = action;
}
// IExecuteItem.Execute
public void Execute()
{
_action();
}
}
}
#endregion
#region WaitHelpers
/// <summary>
/// Wait for handle(s) with message loop and timeout
/// </summary>
public static class WaitHelpers
{
/// <summary>
/// Wait for handles similar to SynchronizationContext.WaitHelper, but with pumping
/// </summary>
public static int WaitWithMessageLoop(IntPtr[] waitHandles, bool waitAll, int timeout)
{
// Don't use CoWaitForMultipleHandles, it has issues with message pumping
// more info: http://stackoverflow.com/q/21226600/1768303
const uint QS_MASK = NativeMethods.QS_ALLINPUT; // message queue status
uint count = (uint)waitHandles.Length;
if (waitHandles == null || count == 0)
throw new ArgumentNullException();
uint nativeResult; // result of the native wait API (WaitForMultipleObjects or MsgWaitForMultipleObjectsEx)
int managedResult; // result to return from WaitHelper
// wait for all?
if (waitAll && count > 1)
{
// more: http://blogs.msdn.com/b/cbrumme/archive/2004/02/02/66219.aspx, search for "mutex"
throw new NotSupportedException("WaitAll for multiple handles on a STA thread is not supported.");
}
else
{
// optimization: a quick check with a zero timeout
nativeResult = NativeMethods.WaitForMultipleObjects(count, waitHandles, bWaitAll: false, dwMilliseconds: 0);
if (IsNativeWaitSuccessful(count, nativeResult, out managedResult))
return managedResult;
// proceed to pumping
// track timeout if not infinite
Func<bool> hasTimedOut = () => false;
int remainingTimeout = timeout;
if (remainingTimeout != Timeout.Infinite)
{
int startTick = Environment.TickCount;
hasTimedOut = () =>
{
// Environment.TickCount wraps correctly even if runs continuously
int lapse = Environment.TickCount - startTick;
remainingTimeout = Math.Max(timeout - lapse, 0);
return remainingTimeout <= 0;
};
}
// the core loop
var msg = new NativeMethods.MSG();
while (true)
{
// MsgWaitForMultipleObjectsEx with MWMO_INPUTAVAILABLE returns,
// even if there's a message already seen but not removed in the message queue
nativeResult = NativeMethods.MsgWaitForMultipleObjectsEx(
count, waitHandles,
(uint)remainingTimeout,
QS_MASK,
NativeMethods.MWMO_INPUTAVAILABLE);
if (IsNativeWaitSuccessful(count, nativeResult, out managedResult) || WaitHandle.WaitTimeout == managedResult)
return managedResult;
// there is a message, pump and dispatch it
if (NativeMethods.PeekMessage(out msg, IntPtr.Zero, 0, 0, NativeMethods.PM_REMOVE))
{
NativeMethods.TranslateMessage(ref msg);
NativeMethods.DispatchMessage(ref msg);
}
if (hasTimedOut())
return WaitHandle.WaitTimeout;
}
}
}
/// <summary>
/// Analyze the result of the native wait API
/// </summary>
static bool IsNativeWaitSuccessful(uint count, uint nativeResult, out int managedResult)
{
if (nativeResult == (NativeMethods.WAIT_OBJECT_0 + count))
{
// a is message pending, only valid for MsgWaitForMultipleObjectsEx
managedResult = unchecked((int)nativeResult);
return false;
}
if (nativeResult >= NativeMethods.WAIT_OBJECT_0 && nativeResult < (NativeMethods.WAIT_OBJECT_0 + count))
{
managedResult = unchecked((int)(nativeResult - NativeMethods.WAIT_OBJECT_0));
return true;
}
if (nativeResult >= NativeMethods.WAIT_ABANDONED_0 && nativeResult < (NativeMethods.WAIT_ABANDONED_0 + count))
{
managedResult = unchecked((int)(nativeResult - NativeMethods.WAIT_ABANDONED_0));
throw new AbandonedMutexException();
}
if (nativeResult == NativeMethods.WAIT_TIMEOUT)
{
managedResult = WaitHandle.WaitTimeout;
return false;
}
throw new InvalidOperationException();
}
#region NativeMethods
/// <summary>
/// NativeMethods - p/invoke declarations
/// </summary>
internal static class NativeMethods
{
public const uint QS_KEY = 0x0001;
public const uint QS_MOUSEMOVE = 0x0002;
public const uint QS_MOUSEBUTTON = 0x0004;
public const uint QS_POSTMESSAGE = 0x0008;
public const uint QS_TIMER = 0x0010;
public const uint QS_PAINT = 0x0020;
public const uint QS_SENDMESSAGE = 0x0040;
public const uint QS_HOTKEY = 0x0080;
public const uint QS_ALLPOSTMESSAGE = 0x0100;
public const uint QS_RAWINPUT = 0x0400;
public const uint QS_MOUSE = (QS_MOUSEMOVE | QS_MOUSEBUTTON);
public const uint QS_INPUT = (QS_MOUSE | QS_KEY | QS_RAWINPUT);
public const uint QS_ALLEVENTS = (QS_INPUT | QS_POSTMESSAGE | QS_TIMER | QS_PAINT | QS_HOTKEY);
public const uint QS_ALLINPUT = (QS_INPUT | QS_POSTMESSAGE | QS_TIMER | QS_PAINT | QS_HOTKEY | QS_SENDMESSAGE);
public const uint MWMO_INPUTAVAILABLE = 0x0004;
public const uint MWMO_WAITALL = 0x0001;
public const uint PM_REMOVE = 0x0001;
public const uint PM_NOREMOVE = 0;
public const uint WAIT_TIMEOUT = 0x00000102;
public const uint WAIT_FAILED = 0xFFFFFFFF;
public const uint INFINITE = 0xFFFFFFFF;
public const uint WAIT_OBJECT_0 = 0;
public const uint WAIT_ABANDONED_0 = 0x00000080;
public const uint WAIT_IO_COMPLETION = 0x000000C0;
[StructLayout(LayoutKind.Sequential)]
public struct MSG
{
public IntPtr hwnd;
public uint message;
public IntPtr wParam;
public IntPtr lParam;
public uint time;
public int x;
public int y;
}
[DllImport("user32.dll")]
[return: MarshalAs(UnmanagedType.Bool)]
public static extern bool PeekMessage(out MSG lpMsg, IntPtr hWnd, uint wMsgFilterMin, uint wMsgFilterMax, uint wRemoveMsg);
[DllImport("user32.dll")]
[return: MarshalAs(UnmanagedType.Bool)]
public static extern bool PostMessage(IntPtr hWnd, uint msg, IntPtr wParam, IntPtr lParam);
[DllImport("user32.dll")]
[return: MarshalAs(UnmanagedType.Bool)]
public static extern bool SendMessage(IntPtr hWnd, uint msg, IntPtr wParam, IntPtr lParam);
[DllImport("user32.dll")]
public static extern IntPtr DispatchMessage([In] ref MSG lpMsg);
[DllImport("user32.dll")]
[return: MarshalAs(UnmanagedType.Bool)]
public static extern bool TranslateMessage([In] ref MSG lpMsg);
[DllImport("ole32.dll", PreserveSig = false)]
public static extern void OleInitialize(IntPtr pvReserved);
[DllImport("ole32.dll", PreserveSig = true)]
public static extern void OleUninitialize();
[DllImport("kernel32.dll")]
public static extern uint GetTickCount();
[DllImport("user32.dll")]
public static extern uint GetQueueStatus(uint flags);
[DllImport("user32.dll", SetLastError = true)]
public static extern uint MsgWaitForMultipleObjectsEx(
uint nCount, IntPtr[] pHandles, uint dwMilliseconds, uint dwWakeMask, uint dwFlags);
[DllImport("kernel32.dll", SetLastError = true)]
public static extern uint WaitForMultipleObjects(
uint nCount, IntPtr[] lpHandles, bool bWaitAll, uint dwMilliseconds);
[DllImport("kernel32.dll", SetLastError = true)]
public static extern uint WaitForSingleObject(IntPtr hHandle, uint dwMilliseconds);
[DllImport("kernel32.dll", SetLastError = true)]
public static extern IntPtr CreateEvent(IntPtr lpEventAttributes, bool bManualReset, bool bInitialState, string lpName);
[DllImport("kernel32.dll", SetLastError = true)]
[return: MarshalAs(UnmanagedType.Bool)]
public static extern bool SetEvent(IntPtr hEvent);
[DllImport("kernel32.dll", SetLastError = true)]
[return: MarshalAs(UnmanagedType.Bool)]
public static extern bool CloseHandle(IntPtr hObject);
}
#endregion
}
#endregion
/// <summary>
/// ThreadAffinityDispatcher - custom multi-threaded dispatcher that runs on top of a
/// <see cref="ThreadAffinityTaskScheduler"/>
///
/// Relevant configuration options:
/// <code>
/// my-forkjoin-dispatcher {
/// type = ThreadAffinityDispatcher
/// throughput = 100
/// dedicated-thread-pool { #settings for Helios.DedicatedThreadPool
/// thread-count = 3 #number of threads
/// #deadlock-timeout = 3s #optional timeout for deadlock detection
/// threadtype = background #values can be "background" or "foreground"
/// }
/// }
/// </code>
/// </summary>
internal sealed class ThreadAffinityExecutor : ExecutorService
{
private ThreadAffinityTaskScheduler _dedicatedThreadPool;
private byte _shuttingDown = 0;
/// <summary>
/// TBD
/// </summary>
/// <param name="id">TBD</param>
/// <param name="poolSettings">TBD</param>
public ThreadAffinityExecutor(string id, AffinityThreadPoolSettings poolSettings) : base(id)
{
_dedicatedThreadPool = new ThreadAffinityTaskScheduler(poolSettings);
}
/// <summary>
/// TBD
/// </summary>
/// <param name="run">TBD</param>
/// <exception cref="RejectedExecutionException">
/// This exception is thrown if this method is called during the shutdown of this executor.
/// </exception>
public override void Execute(IRunnable run)
{
if (Volatile.Read(ref _shuttingDown) == 1)
throw new RejectedExecutionException("ForkJoinExecutor is shutting down");
;
new Task(()=>run.Run()).Start(_dedicatedThreadPool);
}
protected static readonly WaitCallback Executor = t => { ((IRunnable)t).Run(); };
/// <summary>
/// TBD
/// </summary>
public override void Shutdown()
{
// shut down the dedicated threadpool and null it out
Volatile.Write(ref _shuttingDown, 1);
_dedicatedThreadPool?.Dispose();
_dedicatedThreadPool = null;
}
}
/// <summary>
/// INTERNAL API
/// </summary>
internal sealed class ThreadAffinityExecutorServiceFactory : ExecutorServiceConfigurator
{
/// <summary>
/// Used by <see cref="PinnedDispatcher"/>
/// </summary>
public static readonly Config SingleThreadDefault = ConfigurationFactory.ParseString(@"
dedicated-thread-pool{
thread-count = 1
threadtype = background
}
");
private readonly AffinityThreadPoolSettings _threadPoolConfiguration;
/// <summary>
/// Initializes a new instance of the <see cref="ForkJoinExecutorServiceFactory"/> class.
/// </summary>
/// <param name="config">TBD</param>
/// <param name="prerequisites">TBD</param>
/// <exception cref="ConfigurationException">
/// This exception is thrown if either 'dedicated-thread-pool' OR 'fork-join-executor' is not defined in <paramref name="config"/>.
/// </exception>
public ThreadAffinityExecutorServiceFactory(Config config, IDispatcherPrerequisites prerequisites)
: base(config, prerequisites)
{
_threadPoolConfiguration = ConfigureSettings(Config);
}
/// <summary>
/// TBD
/// </summary>
/// <param name="id">TBD</param>
/// <returns>TBD</returns>
public override ExecutorService Produce(string id)
{
return new ThreadAffinityExecutor(id, _threadPoolConfiguration);
}
private static AffinityThreadPoolSettings ConfigureSettings(Config config)
{
var dtp = config.GetConfig("dedicated-thread-pool");
if (dtp.IsNullOrEmpty()) throw new ConfigurationException(
$"must define section 'dedicated-thread-pool' for thread-affinity-executor {config.GetString("id", "unknown")}");
var settings = new AffinityThreadPoolSettings(
dtp.GetInt("thread-count"),
DedicatedThreadPoolConfigHelpers.ConfigureThreadType(
dtp.GetString("threadtype", ThreadType.Background.ToString())),
config.GetString("id"),
DedicatedThreadPoolConfigHelpers.GetSafeDeadlockTimeout(dtp));
return settings;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment