Created
December 5, 2020 18:10
-
-
Save to11mtm/d5a7532e0e0d93b7392f7e9207ac0cee to your computer and use it in GitHub Desktop.
Thread Affinity Task Scheduler for Akka
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// based heavily on https://github.com/noseratio/tpl/blob/master/ThreadAffinityTaskScheduler.cs | |
// //----------------------------------------------------------------------- | |
// // <copyright file="ThreadAffinityDispatcher.cs" company="Akka.NET Project"> | |
// // Copyright (C) 2009-2020 Lightbend Inc. <http://www.lightbend.com> | |
// // Copyright (C) 2013-2020 .NET Foundation <https://github.com/akkadotnet/akka.net> | |
// // </copyright> | |
// //----------------------------------------------------------------------- | |
using System.Threading.Tasks; | |
using System.Runtime.InteropServices; | |
using System; | |
using System.Collections.Concurrent; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Threading; | |
using Akka.Dispatch; | |
using Helios.Concurrency; | |
using Akka.Configuration; | |
namespace Akka.Dispatch | |
{ | |
internal class AffinityThreadPoolSettings | |
{ | |
/// <summary> | |
/// Background threads are the default thread type | |
/// </summary> | |
public const ThreadType DefaultThreadType = ThreadType.Background; | |
public AffinityThreadPoolSettings(int numThreads, string name = null, TimeSpan? deadlockTimeout = null) | |
: this(numThreads, DefaultThreadType, name, deadlockTimeout) | |
{ } | |
public AffinityThreadPoolSettings( | |
int numThreads, | |
ThreadType threadType, | |
string name = null, | |
TimeSpan? deadlockTimeout = null) | |
{ | |
Name = name ?? ("DedicatedThreadPool-" + Guid.NewGuid()); | |
ThreadType = threadType; | |
NumThreads = numThreads; | |
DeadlockTimeout = deadlockTimeout; | |
if (deadlockTimeout.HasValue && deadlockTimeout.Value.TotalMilliseconds <= 0) | |
throw new ArgumentOutOfRangeException("deadlockTimeout", string.Format("deadlockTimeout must be null or at least 1ms. Was {0}.", deadlockTimeout)); | |
if (numThreads <= 0) | |
throw new ArgumentOutOfRangeException("numThreads", string.Format("numThreads must be at least 1. Was {0}", numThreads)); | |
} | |
/// <summary> | |
/// The total number of threads to run in this thread pool. | |
/// </summary> | |
public int NumThreads { get; private set; } | |
/// <summary> | |
/// The type of threads to run in this thread pool. | |
/// </summary> | |
public ThreadType ThreadType { get; private set; } | |
/// <summary> | |
/// Interval to check for thread deadlocks. | |
/// | |
/// If a thread takes longer than <see cref="DeadlockTimeout"/> it will be aborted | |
/// and replaced. | |
/// </summary> | |
public TimeSpan? DeadlockTimeout { get; private set; } | |
/// <summary> | |
/// TBD | |
/// </summary> | |
public string Name { get; private set; } | |
/// <summary> | |
/// TBD | |
/// </summary> | |
public Action<Exception> ExceptionHandler { get; private set; } | |
/// <summary> | |
/// Gets the thread stack size, 0 represents the default stack size. | |
/// </summary> | |
public int ThreadMaxStackSize { get; private set; } | |
} | |
internal interface IExecuteItem | |
{ | |
void Execute(); | |
} | |
/// <summary>Customize a blocking wait (e.g., with a message pump)</summary> | |
public delegate int WaitHelperFunc(IntPtr[] waitHandles, bool waitAll, int millisecondsTimeout); | |
/// <summary>Customize a blocking retrieval of a queued task item</summary> | |
internal delegate IExecuteItem TakeFunc(BlockingCollection<IExecuteItem> items, CancellationToken token); | |
#region ThreadAffinityTaskScheduler | |
/// <summary> | |
/// Provides a pool of single-threaded apartments, with optional message pumping | |
/// Each apartment provides asynchronous continuation after `await` on the same thread, | |
/// </summary> | |
public sealed class ThreadAffinityTaskScheduler : TaskScheduler, IDisposable | |
{ | |
/// <summary>Stores the queued tasks to be executed by one of our apartments.</summary> | |
private BlockingCollection<IExecuteItem> _items; | |
/// <summary>The Cancellation Token Source object to request the termination.</summary> | |
private readonly CancellationTokenSource _terminationCts; | |
/// <summary>The list of SingleThreadSynchronizationContext objects to represent apartments (threads).</summary> | |
private readonly List<ThreadWithAffinityContext> _threads; | |
internal ThreadAffinityTaskScheduler(AffinityThreadPoolSettings settings):this(settings.NumThreads, settings.ThreadType==ThreadType.Background) | |
{ | |
} | |
/// <summary>Initializes a new instance of the ThreadAffinityTaskScheduler class with the specified concurrency level.</summary> | |
/// <param name="numberOfThreads">The number of threads that should be created and used by this scheduler.</param> | |
public ThreadAffinityTaskScheduler(int numberOfThreads, bool staThreads = false, WaitHelperFunc waitHelper = null) | |
{ | |
// Validate arguments | |
if (numberOfThreads < 1) | |
throw new ArgumentOutOfRangeException("numberOfThreads"); | |
// Initialize the tasks collection | |
_items = new BlockingCollection<IExecuteItem>(); | |
// CTS to cancel task dispatching | |
_terminationCts = new CancellationTokenSource(); | |
// Create the contexts (threads) to be used by this scheduler | |
_threads = Enumerable.Range(0, numberOfThreads).Select(i => | |
new ThreadWithAffinityContext(_terminationCts.Token, staThreads, waitHelper, Take)).ToList(); | |
} | |
/// <summary> | |
/// Take a queued task item either from ThreadAffinityTaskScheduler or ThreadWithAffinityContext, | |
/// blocks if both queues are empty | |
/// </summary> | |
internal IExecuteItem Take(BlockingCollection<IExecuteItem> items, CancellationToken token) | |
{ | |
IExecuteItem item; | |
BlockingCollection<IExecuteItem>.TakeFromAny(new[] { _items, items }, out item, token); | |
return item; | |
} | |
/// <summary>Queues a Task to be executed by this scheduler.</summary> | |
/// <param name="task">The task to be executed.</param> | |
protected override void QueueTask(Task task) | |
{ | |
// Push the task into the blocking collection of tasks | |
_items.Add(new TaskItem(task, this.TryExecuteTask)); | |
} | |
/// <summary>Provides a list of the scheduled tasks for the debugger to consume.</summary> | |
/// <returns>An enumerable of all tasks currently scheduled.</returns> | |
protected override IEnumerable<Task> GetScheduledTasks() | |
{ | |
// Serialize the contents of the blocking collection of tasks for the debugger | |
return _items.Select(t => ((TaskItem)t).Task).ToArray(); | |
} | |
/// <summary>Determines whether a Task may be inlined.</summary> | |
/// <param name="task">The task to be executed.</param> | |
/// <param name="taskWasPreviouslyQueued">Whether the task was previously queued.</param> | |
/// <returns>true if the task was successfully inlined; otherwise, false.</returns> | |
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) | |
{ | |
//TODO: call TryExecuteTask(task) if can be done on the same thread | |
// not sure if that's possible to implement as we don't have an antecedent task to match the thread | |
return false; | |
} | |
/// <summary>Gets the maximum concurrency level supported by this scheduler.</summary> | |
public override int MaximumConcurrencyLevel | |
{ | |
get { return _threads.Count; } | |
} | |
/// <summary> | |
/// Request the termination and so the cleanup. | |
/// This method blocks until all threads successfully shutdown. | |
/// </summary> | |
public void Dispose() | |
{ | |
if (_items != null) | |
{ | |
// Request the cancellation | |
_terminationCts.Cancel(); | |
// Dispose each context | |
foreach (var context in _threads) | |
context.Dispose(); | |
// Cleanup | |
_items.Dispose(); | |
_items = null; | |
} | |
} | |
/// <summary>A handy wrapper around Task.Factory.StartNew</summary> | |
public Task Run(Action action, CancellationToken token) | |
{ | |
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this); | |
} | |
/// <summary>A handy wrapper around Task.Factory.StartNew</summary> | |
public Task Run(Func<Task> action, CancellationToken token) | |
{ | |
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this).Unwrap(); | |
} | |
/// <summary>A handy wrapper around Task.Factory.StartNew</summary> | |
public Task<TResult> Run<TResult>(Func<Task<TResult>> action, CancellationToken token) | |
{ | |
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this).Unwrap(); | |
} | |
#region TaskItem | |
/// <summary> | |
/// A simple container to store and execute a task via IExecuteItem.Execute | |
/// </summary> | |
internal class TaskItem : IExecuteItem | |
{ | |
public Task Task { get; private set; } | |
readonly Func<Task, bool> _executeTask; | |
protected TaskItem() { } | |
public TaskItem(Task task, Func<Task, bool> executeTask) | |
{ | |
this.Task = task; | |
_executeTask = executeTask; | |
} | |
// IExecuteItem.Execute | |
public void Execute() | |
{ | |
_executeTask(this.Task); | |
} | |
} | |
#endregion | |
} | |
#endregion | |
#region ThreadWithAffinityContext | |
/// <summary> | |
/// A helper Synchronization Context class to post continuations on the same thread | |
/// (was: SingleThreadSynchronizationContext) | |
/// </summary> | |
public class ThreadWithAffinityContext : SynchronizationContext, IDisposable | |
{ | |
private BlockingCollection<IExecuteItem> _items; // pending tasks | |
WaitHelperFunc _waitHelper; // allows to customize the blocking wait (with message pump) | |
TakeFunc _takeFunc; // pump the task queue | |
Thread _thread; // the thread with the sync context installed on it | |
CancellationTokenSource _cts; // ask the thread to end | |
/// <summary>// TaskScheduler.FromCurrentSynchronizationContext()</summary> | |
public TaskScheduler Scheduler { get; private set; } | |
/// <summary> | |
/// Construct a standaline instance of ThreadWithAffinityContext | |
/// </summary> | |
public ThreadWithAffinityContext(bool staThread = true, bool pumpMessages = true) : | |
this(CancellationToken.None, | |
staThread, | |
pumpMessages ? WaitHelpers.WaitWithMessageLoop : (WaitHelperFunc)null, | |
ThreadWithAffinityContext.Take) | |
{ | |
} | |
/// <summary> | |
/// Construct a standaline instance of ThreadWithAffinityContext with cancellation | |
/// </summary> | |
public ThreadWithAffinityContext(CancellationToken token, bool staThread = true, bool pumpMessages = true) : | |
this(token, | |
staThread, | |
pumpMessages ? WaitHelpers.WaitWithMessageLoop : (WaitHelperFunc)null, | |
ThreadWithAffinityContext.Take) | |
{ | |
} | |
/// <summary> | |
/// Construct an instance of ThreadWithAffinityContext to used by ThreadAffinityTaskScheduler | |
/// </summary> | |
internal ThreadWithAffinityContext(CancellationToken token, bool staThread, WaitHelperFunc waitHelper, TakeFunc takeFunc) | |
{ | |
_waitHelper = waitHelper; | |
_items = new BlockingCollection<IExecuteItem>(); | |
_takeFunc = takeFunc; | |
_cts = CancellationTokenSource.CreateLinkedTokenSource(token); | |
if (_takeFunc == null) | |
_takeFunc = Take; | |
// this makes our override of SynchronizationContext.Wait get called | |
if (_waitHelper == null) | |
_waitHelper = WaitHelper; | |
else | |
base.SetWaitNotificationRequired(); | |
// use TCS to return SingleThreadSynchronizationContext to the task scheduler | |
var tcs = new TaskCompletionSource<TaskScheduler>(); | |
_thread = new Thread(() => | |
{ | |
// install on the current thread | |
SynchronizationContext.SetSynchronizationContext(this); | |
try | |
{ | |
tcs.SetResult(TaskScheduler.FromCurrentSynchronizationContext()); // the sync. context is ready, return it to the task scheduler | |
// the thread's core task dispatching loop, terminate-able with token | |
while (true) | |
{ | |
var executeItem = _takeFunc(_items, _cts.Token); | |
// execute a Task queued by ThreadAffinityTaskScheduler.QueueTask | |
// or a callback queued by SingleThreadSynchronizationContext.Post | |
executeItem.Execute(); | |
}; | |
} | |
catch (OperationCanceledException) | |
{ | |
// ignore OperationCanceledException exceptions when terminating | |
if (!_cts.Token.IsCancellationRequested) | |
throw; | |
} | |
finally | |
{ | |
SynchronizationContext.SetSynchronizationContext(null); | |
} | |
}); | |
// make it an STA thread if message pumping is requested | |
if (staThread) | |
_thread.SetApartmentState(ApartmentState.STA); | |
_thread.IsBackground = true; | |
_thread.Start(); | |
this.Scheduler = tcs.Task.Result; | |
} | |
public override void Post(SendOrPostCallback d, object state) | |
{ | |
_items.Add(new ActionItem(() => d(state))); | |
} | |
public override void Send(SendOrPostCallback d, object state) | |
{ | |
//TODO: currently we don't support (and expect) synchronous callbacks | |
throw new NotImplementedException(); | |
} | |
public override SynchronizationContext CreateCopy() | |
{ | |
// more info: http://stackoverflow.com/q/21062440 | |
return this; | |
} | |
public override int Wait(IntPtr[] waitHandles, bool waitAll, int millisecondsTimeout) | |
{ | |
// this can pump if needed | |
return _waitHelper(waitHandles, waitAll, millisecondsTimeout); | |
} | |
public void Dispose() | |
{ | |
// cleanup | |
if (_thread != null) | |
{ | |
// request the cancellation and wait | |
_cts.Cancel(); | |
_thread.Join(); | |
_thread = null; | |
} | |
if (_items != null) | |
{ | |
_items.Dispose(); | |
_items = null; | |
} | |
} | |
/// <summary>A handy wrapper around Task.Factory.StartNew</summary> | |
public Task Run(Action action, CancellationToken token) | |
{ | |
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this.Scheduler); | |
} | |
/// <summary>A handy wrapper around Task.Factory.StartNew</summary> | |
public Task Run(Func<Task> action, CancellationToken token) | |
{ | |
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this.Scheduler).Unwrap(); | |
} | |
/// <summary>A handy wrapper around Task.Factory.StartNew</summary> | |
public Task<TResult> Run<TResult>(Func<Task<TResult>> action, CancellationToken token) | |
{ | |
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this.Scheduler).Unwrap(); | |
} | |
/// <summary> | |
/// Take queued task item, blocks if the queue is empty | |
/// </summary> | |
static internal IExecuteItem Take(BlockingCollection<IExecuteItem> items, CancellationToken token) | |
{ | |
return items.Take(token); | |
} | |
/// <summary> | |
/// A simple container to store and execute an action via IExecuteItem.Execute | |
/// </summary> | |
class ActionItem : IExecuteItem | |
{ | |
readonly Action _action; | |
protected ActionItem() { } | |
public ActionItem(Action action) | |
{ | |
_action = action; | |
} | |
// IExecuteItem.Execute | |
public void Execute() | |
{ | |
_action(); | |
} | |
} | |
} | |
#endregion | |
#region WaitHelpers | |
/// <summary> | |
/// Wait for handle(s) with message loop and timeout | |
/// </summary> | |
public static class WaitHelpers | |
{ | |
/// <summary> | |
/// Wait for handles similar to SynchronizationContext.WaitHelper, but with pumping | |
/// </summary> | |
public static int WaitWithMessageLoop(IntPtr[] waitHandles, bool waitAll, int timeout) | |
{ | |
// Don't use CoWaitForMultipleHandles, it has issues with message pumping | |
// more info: http://stackoverflow.com/q/21226600/1768303 | |
const uint QS_MASK = NativeMethods.QS_ALLINPUT; // message queue status | |
uint count = (uint)waitHandles.Length; | |
if (waitHandles == null || count == 0) | |
throw new ArgumentNullException(); | |
uint nativeResult; // result of the native wait API (WaitForMultipleObjects or MsgWaitForMultipleObjectsEx) | |
int managedResult; // result to return from WaitHelper | |
// wait for all? | |
if (waitAll && count > 1) | |
{ | |
// more: http://blogs.msdn.com/b/cbrumme/archive/2004/02/02/66219.aspx, search for "mutex" | |
throw new NotSupportedException("WaitAll for multiple handles on a STA thread is not supported."); | |
} | |
else | |
{ | |
// optimization: a quick check with a zero timeout | |
nativeResult = NativeMethods.WaitForMultipleObjects(count, waitHandles, bWaitAll: false, dwMilliseconds: 0); | |
if (IsNativeWaitSuccessful(count, nativeResult, out managedResult)) | |
return managedResult; | |
// proceed to pumping | |
// track timeout if not infinite | |
Func<bool> hasTimedOut = () => false; | |
int remainingTimeout = timeout; | |
if (remainingTimeout != Timeout.Infinite) | |
{ | |
int startTick = Environment.TickCount; | |
hasTimedOut = () => | |
{ | |
// Environment.TickCount wraps correctly even if runs continuously | |
int lapse = Environment.TickCount - startTick; | |
remainingTimeout = Math.Max(timeout - lapse, 0); | |
return remainingTimeout <= 0; | |
}; | |
} | |
// the core loop | |
var msg = new NativeMethods.MSG(); | |
while (true) | |
{ | |
// MsgWaitForMultipleObjectsEx with MWMO_INPUTAVAILABLE returns, | |
// even if there's a message already seen but not removed in the message queue | |
nativeResult = NativeMethods.MsgWaitForMultipleObjectsEx( | |
count, waitHandles, | |
(uint)remainingTimeout, | |
QS_MASK, | |
NativeMethods.MWMO_INPUTAVAILABLE); | |
if (IsNativeWaitSuccessful(count, nativeResult, out managedResult) || WaitHandle.WaitTimeout == managedResult) | |
return managedResult; | |
// there is a message, pump and dispatch it | |
if (NativeMethods.PeekMessage(out msg, IntPtr.Zero, 0, 0, NativeMethods.PM_REMOVE)) | |
{ | |
NativeMethods.TranslateMessage(ref msg); | |
NativeMethods.DispatchMessage(ref msg); | |
} | |
if (hasTimedOut()) | |
return WaitHandle.WaitTimeout; | |
} | |
} | |
} | |
/// <summary> | |
/// Analyze the result of the native wait API | |
/// </summary> | |
static bool IsNativeWaitSuccessful(uint count, uint nativeResult, out int managedResult) | |
{ | |
if (nativeResult == (NativeMethods.WAIT_OBJECT_0 + count)) | |
{ | |
// a is message pending, only valid for MsgWaitForMultipleObjectsEx | |
managedResult = unchecked((int)nativeResult); | |
return false; | |
} | |
if (nativeResult >= NativeMethods.WAIT_OBJECT_0 && nativeResult < (NativeMethods.WAIT_OBJECT_0 + count)) | |
{ | |
managedResult = unchecked((int)(nativeResult - NativeMethods.WAIT_OBJECT_0)); | |
return true; | |
} | |
if (nativeResult >= NativeMethods.WAIT_ABANDONED_0 && nativeResult < (NativeMethods.WAIT_ABANDONED_0 + count)) | |
{ | |
managedResult = unchecked((int)(nativeResult - NativeMethods.WAIT_ABANDONED_0)); | |
throw new AbandonedMutexException(); | |
} | |
if (nativeResult == NativeMethods.WAIT_TIMEOUT) | |
{ | |
managedResult = WaitHandle.WaitTimeout; | |
return false; | |
} | |
throw new InvalidOperationException(); | |
} | |
#region NativeMethods | |
/// <summary> | |
/// NativeMethods - p/invoke declarations | |
/// </summary> | |
internal static class NativeMethods | |
{ | |
public const uint QS_KEY = 0x0001; | |
public const uint QS_MOUSEMOVE = 0x0002; | |
public const uint QS_MOUSEBUTTON = 0x0004; | |
public const uint QS_POSTMESSAGE = 0x0008; | |
public const uint QS_TIMER = 0x0010; | |
public const uint QS_PAINT = 0x0020; | |
public const uint QS_SENDMESSAGE = 0x0040; | |
public const uint QS_HOTKEY = 0x0080; | |
public const uint QS_ALLPOSTMESSAGE = 0x0100; | |
public const uint QS_RAWINPUT = 0x0400; | |
public const uint QS_MOUSE = (QS_MOUSEMOVE | QS_MOUSEBUTTON); | |
public const uint QS_INPUT = (QS_MOUSE | QS_KEY | QS_RAWINPUT); | |
public const uint QS_ALLEVENTS = (QS_INPUT | QS_POSTMESSAGE | QS_TIMER | QS_PAINT | QS_HOTKEY); | |
public const uint QS_ALLINPUT = (QS_INPUT | QS_POSTMESSAGE | QS_TIMER | QS_PAINT | QS_HOTKEY | QS_SENDMESSAGE); | |
public const uint MWMO_INPUTAVAILABLE = 0x0004; | |
public const uint MWMO_WAITALL = 0x0001; | |
public const uint PM_REMOVE = 0x0001; | |
public const uint PM_NOREMOVE = 0; | |
public const uint WAIT_TIMEOUT = 0x00000102; | |
public const uint WAIT_FAILED = 0xFFFFFFFF; | |
public const uint INFINITE = 0xFFFFFFFF; | |
public const uint WAIT_OBJECT_0 = 0; | |
public const uint WAIT_ABANDONED_0 = 0x00000080; | |
public const uint WAIT_IO_COMPLETION = 0x000000C0; | |
[StructLayout(LayoutKind.Sequential)] | |
public struct MSG | |
{ | |
public IntPtr hwnd; | |
public uint message; | |
public IntPtr wParam; | |
public IntPtr lParam; | |
public uint time; | |
public int x; | |
public int y; | |
} | |
[DllImport("user32.dll")] | |
[return: MarshalAs(UnmanagedType.Bool)] | |
public static extern bool PeekMessage(out MSG lpMsg, IntPtr hWnd, uint wMsgFilterMin, uint wMsgFilterMax, uint wRemoveMsg); | |
[DllImport("user32.dll")] | |
[return: MarshalAs(UnmanagedType.Bool)] | |
public static extern bool PostMessage(IntPtr hWnd, uint msg, IntPtr wParam, IntPtr lParam); | |
[DllImport("user32.dll")] | |
[return: MarshalAs(UnmanagedType.Bool)] | |
public static extern bool SendMessage(IntPtr hWnd, uint msg, IntPtr wParam, IntPtr lParam); | |
[DllImport("user32.dll")] | |
public static extern IntPtr DispatchMessage([In] ref MSG lpMsg); | |
[DllImport("user32.dll")] | |
[return: MarshalAs(UnmanagedType.Bool)] | |
public static extern bool TranslateMessage([In] ref MSG lpMsg); | |
[DllImport("ole32.dll", PreserveSig = false)] | |
public static extern void OleInitialize(IntPtr pvReserved); | |
[DllImport("ole32.dll", PreserveSig = true)] | |
public static extern void OleUninitialize(); | |
[DllImport("kernel32.dll")] | |
public static extern uint GetTickCount(); | |
[DllImport("user32.dll")] | |
public static extern uint GetQueueStatus(uint flags); | |
[DllImport("user32.dll", SetLastError = true)] | |
public static extern uint MsgWaitForMultipleObjectsEx( | |
uint nCount, IntPtr[] pHandles, uint dwMilliseconds, uint dwWakeMask, uint dwFlags); | |
[DllImport("kernel32.dll", SetLastError = true)] | |
public static extern uint WaitForMultipleObjects( | |
uint nCount, IntPtr[] lpHandles, bool bWaitAll, uint dwMilliseconds); | |
[DllImport("kernel32.dll", SetLastError = true)] | |
public static extern uint WaitForSingleObject(IntPtr hHandle, uint dwMilliseconds); | |
[DllImport("kernel32.dll", SetLastError = true)] | |
public static extern IntPtr CreateEvent(IntPtr lpEventAttributes, bool bManualReset, bool bInitialState, string lpName); | |
[DllImport("kernel32.dll", SetLastError = true)] | |
[return: MarshalAs(UnmanagedType.Bool)] | |
public static extern bool SetEvent(IntPtr hEvent); | |
[DllImport("kernel32.dll", SetLastError = true)] | |
[return: MarshalAs(UnmanagedType.Bool)] | |
public static extern bool CloseHandle(IntPtr hObject); | |
} | |
#endregion | |
} | |
#endregion | |
/// <summary> | |
/// ThreadAffinityDispatcher - custom multi-threaded dispatcher that runs on top of a | |
/// <see cref="ThreadAffinityTaskScheduler"/> | |
/// | |
/// Relevant configuration options: | |
/// <code> | |
/// my-forkjoin-dispatcher { | |
/// type = ThreadAffinityDispatcher | |
/// throughput = 100 | |
/// dedicated-thread-pool { #settings for Helios.DedicatedThreadPool | |
/// thread-count = 3 #number of threads | |
/// #deadlock-timeout = 3s #optional timeout for deadlock detection | |
/// threadtype = background #values can be "background" or "foreground" | |
/// } | |
/// } | |
/// </code> | |
/// </summary> | |
internal sealed class ThreadAffinityExecutor : ExecutorService | |
{ | |
private ThreadAffinityTaskScheduler _dedicatedThreadPool; | |
private byte _shuttingDown = 0; | |
/// <summary> | |
/// TBD | |
/// </summary> | |
/// <param name="id">TBD</param> | |
/// <param name="poolSettings">TBD</param> | |
public ThreadAffinityExecutor(string id, AffinityThreadPoolSettings poolSettings) : base(id) | |
{ | |
_dedicatedThreadPool = new ThreadAffinityTaskScheduler(poolSettings); | |
} | |
/// <summary> | |
/// TBD | |
/// </summary> | |
/// <param name="run">TBD</param> | |
/// <exception cref="RejectedExecutionException"> | |
/// This exception is thrown if this method is called during the shutdown of this executor. | |
/// </exception> | |
public override void Execute(IRunnable run) | |
{ | |
if (Volatile.Read(ref _shuttingDown) == 1) | |
throw new RejectedExecutionException("ForkJoinExecutor is shutting down"); | |
; | |
new Task(()=>run.Run()).Start(_dedicatedThreadPool); | |
} | |
protected static readonly WaitCallback Executor = t => { ((IRunnable)t).Run(); }; | |
/// <summary> | |
/// TBD | |
/// </summary> | |
public override void Shutdown() | |
{ | |
// shut down the dedicated threadpool and null it out | |
Volatile.Write(ref _shuttingDown, 1); | |
_dedicatedThreadPool?.Dispose(); | |
_dedicatedThreadPool = null; | |
} | |
} | |
/// <summary> | |
/// INTERNAL API | |
/// </summary> | |
internal sealed class ThreadAffinityExecutorServiceFactory : ExecutorServiceConfigurator | |
{ | |
/// <summary> | |
/// Used by <see cref="PinnedDispatcher"/> | |
/// </summary> | |
public static readonly Config SingleThreadDefault = ConfigurationFactory.ParseString(@" | |
dedicated-thread-pool{ | |
thread-count = 1 | |
threadtype = background | |
} | |
"); | |
private readonly AffinityThreadPoolSettings _threadPoolConfiguration; | |
/// <summary> | |
/// Initializes a new instance of the <see cref="ForkJoinExecutorServiceFactory"/> class. | |
/// </summary> | |
/// <param name="config">TBD</param> | |
/// <param name="prerequisites">TBD</param> | |
/// <exception cref="ConfigurationException"> | |
/// This exception is thrown if either 'dedicated-thread-pool' OR 'fork-join-executor' is not defined in <paramref name="config"/>. | |
/// </exception> | |
public ThreadAffinityExecutorServiceFactory(Config config, IDispatcherPrerequisites prerequisites) | |
: base(config, prerequisites) | |
{ | |
_threadPoolConfiguration = ConfigureSettings(Config); | |
} | |
/// <summary> | |
/// TBD | |
/// </summary> | |
/// <param name="id">TBD</param> | |
/// <returns>TBD</returns> | |
public override ExecutorService Produce(string id) | |
{ | |
return new ThreadAffinityExecutor(id, _threadPoolConfiguration); | |
} | |
private static AffinityThreadPoolSettings ConfigureSettings(Config config) | |
{ | |
var dtp = config.GetConfig("dedicated-thread-pool"); | |
if (dtp.IsNullOrEmpty()) throw new ConfigurationException( | |
$"must define section 'dedicated-thread-pool' for thread-affinity-executor {config.GetString("id", "unknown")}"); | |
var settings = new AffinityThreadPoolSettings( | |
dtp.GetInt("thread-count"), | |
DedicatedThreadPoolConfigHelpers.ConfigureThreadType( | |
dtp.GetString("threadtype", ThreadType.Background.ToString())), | |
config.GetString("id"), | |
DedicatedThreadPoolConfigHelpers.GetSafeDeadlockTimeout(dtp)); | |
return settings; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment