Created December 5, 2020 18:10
Thread Affinity Task Scheduler for Akka
// based heavily on
// //-----------------------------------------------------------------------
// // <copyright file="ThreadAffinityDispatcher.cs" company="Akka.NET Project">
// // Copyright (C) 2009-2020 Lightbend Inc. <>
// // Copyright (C) 2013-2020 .NET Foundation <>
// // </copyright>
// //-----------------------------------------------------------------------
using System.Threading.Tasks;
using System.Runtime.InteropServices;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Akka.Dispatch;
using Helios.Concurrency;
using Akka.Configuration;
namespace Akka.Dispatch
internal class AffinityThreadPoolSettings
/// <summary>
/// Background threads are the default thread type
/// </summary>
public const ThreadType DefaultThreadType = ThreadType.Background;
public AffinityThreadPoolSettings(int numThreads, string name = null, TimeSpan? deadlockTimeout = null)
: this(numThreads, DefaultThreadType, name, deadlockTimeout)
{ }
public AffinityThreadPoolSettings(
int numThreads,
ThreadType threadType,
string name = null,
TimeSpan? deadlockTimeout = null)
Name = name ?? ("DedicatedThreadPool-" + Guid.NewGuid());
ThreadType = threadType;
NumThreads = numThreads;
DeadlockTimeout = deadlockTimeout;
if (deadlockTimeout.HasValue && deadlockTimeout.Value.TotalMilliseconds <= 0)
throw new ArgumentOutOfRangeException("deadlockTimeout", string.Format("deadlockTimeout must be null or at least 1ms. Was {0}.", deadlockTimeout));
if (numThreads <= 0)
throw new ArgumentOutOfRangeException("numThreads", string.Format("numThreads must be at least 1. Was {0}", numThreads));
/// <summary>
/// The total number of threads to run in this thread pool.
/// </summary>
public int NumThreads { get; private set; }
/// <summary>
/// The type of threads to run in this thread pool.
/// </summary>
public ThreadType ThreadType { get; private set; }
/// <summary>
/// Interval to check for thread deadlocks.
/// If a thread takes longer than <see cref="DeadlockTimeout"/> it will be aborted
/// and replaced.
/// </summary>
public TimeSpan? DeadlockTimeout { get; private set; }
/// <summary>
/// TBD
/// </summary>
public string Name { get; private set; }
/// <summary>
/// TBD
/// </summary>
public Action<Exception> ExceptionHandler { get; private set; }
/// <summary>
/// Gets the thread stack size, 0 represents the default stack size.
/// </summary>
public int ThreadMaxStackSize { get; private set; }
internal interface IExecuteItem
void Execute();
/// <summary>Customize a blocking wait (e.g., with a message pump)</summary>
public delegate int WaitHelperFunc(IntPtr[] waitHandles, bool waitAll, int millisecondsTimeout);
/// <summary>Customize a blocking retrieval of a queued task item</summary>
internal delegate IExecuteItem TakeFunc(BlockingCollection<IExecuteItem> items, CancellationToken token);
#region ThreadAffinityTaskScheduler
/// <summary>
/// Provides a pool of single-threaded apartments, with optional message pumping
/// Each apartment provides asynchronous continuation after `await` on the same thread,
/// </summary>
public sealed class ThreadAffinityTaskScheduler : TaskScheduler, IDisposable
/// <summary>Stores the queued tasks to be executed by one of our apartments.</summary>
private BlockingCollection<IExecuteItem> _items;
/// <summary>The Cancellation Token Source object to request the termination.</summary>
private readonly CancellationTokenSource _terminationCts;
/// <summary>The list of SingleThreadSynchronizationContext objects to represent apartments (threads).</summary>
private readonly List<ThreadWithAffinityContext> _threads;
internal ThreadAffinityTaskScheduler(AffinityThreadPoolSettings settings):this(settings.NumThreads, settings.ThreadType==ThreadType.Background)
/// <summary>Initializes a new instance of the ThreadAffinityTaskScheduler class with the specified concurrency level.</summary>
/// <param name="numberOfThreads">The number of threads that should be created and used by this scheduler.</param>
public ThreadAffinityTaskScheduler(int numberOfThreads, bool staThreads = false, WaitHelperFunc waitHelper = null)
// Validate arguments
if (numberOfThreads < 1)
throw new ArgumentOutOfRangeException("numberOfThreads");
// Initialize the tasks collection
_items = new BlockingCollection<IExecuteItem>();
// CTS to cancel task dispatching
_terminationCts = new CancellationTokenSource();
// Create the contexts (threads) to be used by this scheduler
_threads = Enumerable.Range(0, numberOfThreads).Select(i =>
new ThreadWithAffinityContext(_terminationCts.Token, staThreads, waitHelper, Take)).ToList();
/// <summary>
/// Take a queued task item either from ThreadAffinityTaskScheduler or ThreadWithAffinityContext,
/// blocks if both queues are empty
/// </summary>
internal IExecuteItem Take(BlockingCollection<IExecuteItem> items, CancellationToken token)
IExecuteItem item;
BlockingCollection<IExecuteItem>.TakeFromAny(new[] { _items, items }, out item, token);
return item;
/// <summary>Queues a Task to be executed by this scheduler.</summary>
/// <param name="task">The task to be executed.</param>
protected override void QueueTask(Task task)
// Push the task into the blocking collection of tasks
_items.Add(new TaskItem(task, this.TryExecuteTask));
/// <summary>Provides a list of the scheduled tasks for the debugger to consume.</summary>
/// <returns>An enumerable of all tasks currently scheduled.</returns>
protected override IEnumerable<Task> GetScheduledTasks()
// Serialize the contents of the blocking collection of tasks for the debugger
return _items.Select(t => ((TaskItem)t).Task).ToArray();
/// <summary>Determines whether a Task may be inlined.</summary>
/// <param name="task">The task to be executed.</param>
/// <param name="taskWasPreviouslyQueued">Whether the task was previously queued.</param>
/// <returns>true if the task was successfully inlined; otherwise, false.</returns>
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
//TODO: call TryExecuteTask(task) if can be done on the same thread
// not sure if that's possible to implement as we don't have an antecedent task to match the thread
return false;
/// <summary>Gets the maximum concurrency level supported by this scheduler.</summary>
public override int MaximumConcurrencyLevel
get { return _threads.Count; }
/// <summary>
/// Request the termination and so the cleanup.
/// This method blocks until all threads successfully shutdown.
/// </summary>
public void Dispose()
if (_items != null)
// Request the cancellation
// Dispose each context
foreach (var context in _threads)
// Cleanup
_items = null;
/// <summary>A handy wrapper around Task.Factory.StartNew</summary>
public Task Run(Action action, CancellationToken token)
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this);
/// <summary>A handy wrapper around Task.Factory.StartNew</summary>
public Task Run(Func<Task> action, CancellationToken token)
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this).Unwrap();
/// <summary>A handy wrapper around Task.Factory.StartNew</summary>
public Task<TResult> Run<TResult>(Func<Task<TResult>> action, CancellationToken token)
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this).Unwrap();
#region TaskItem
/// <summary>
/// A simple container to store and execute a task via IExecuteItem.Execute
/// </summary>
internal class TaskItem : IExecuteItem
public Task Task { get; private set; }
readonly Func<Task, bool> _executeTask;
protected TaskItem() { }
public TaskItem(Task task, Func<Task, bool> executeTask)
this.Task = task;
_executeTask = executeTask;
// IExecuteItem.Execute
public void Execute()
#region ThreadWithAffinityContext
/// <summary>
/// A helper Synchronization Context class to post continuations on the same thread
/// (was: SingleThreadSynchronizationContext)
/// </summary>
public class ThreadWithAffinityContext : SynchronizationContext, IDisposable
private BlockingCollection<IExecuteItem> _items; // pending tasks
WaitHelperFunc _waitHelper; // allows to customize the blocking wait (with message pump)
TakeFunc _takeFunc; // pump the task queue
Thread _thread; // the thread with the sync context installed on it
CancellationTokenSource _cts; // ask the thread to end
/// <summary>// TaskScheduler.FromCurrentSynchronizationContext()</summary>
public TaskScheduler Scheduler { get; private set; }
/// <summary>
/// Construct a standaline instance of ThreadWithAffinityContext
/// </summary>
public ThreadWithAffinityContext(bool staThread = true, bool pumpMessages = true) :
pumpMessages ? WaitHelpers.WaitWithMessageLoop : (WaitHelperFunc)null,
/// <summary>
/// Construct a standaline instance of ThreadWithAffinityContext with cancellation
/// </summary>
public ThreadWithAffinityContext(CancellationToken token, bool staThread = true, bool pumpMessages = true) :
pumpMessages ? WaitHelpers.WaitWithMessageLoop : (WaitHelperFunc)null,
/// <summary>
/// Construct an instance of ThreadWithAffinityContext to used by ThreadAffinityTaskScheduler
/// </summary>
internal ThreadWithAffinityContext(CancellationToken token, bool staThread, WaitHelperFunc waitHelper, TakeFunc takeFunc)
_waitHelper = waitHelper;
_items = new BlockingCollection<IExecuteItem>();
_takeFunc = takeFunc;
_cts = CancellationTokenSource.CreateLinkedTokenSource(token);
if (_takeFunc == null)
_takeFunc = Take;
// this makes our override of SynchronizationContext.Wait get called
if (_waitHelper == null)
_waitHelper = WaitHelper;
// use TCS to return SingleThreadSynchronizationContext to the task scheduler
var tcs = new TaskCompletionSource<TaskScheduler>();
_thread = new Thread(() =>
// install on the current thread
tcs.SetResult(TaskScheduler.FromCurrentSynchronizationContext()); // the sync. context is ready, return it to the task scheduler
// the thread's core task dispatching loop, terminate-able with token
while (true)
var executeItem = _takeFunc(_items, _cts.Token);
// execute a Task queued by ThreadAffinityTaskScheduler.QueueTask
// or a callback queued by SingleThreadSynchronizationContext.Post
catch (OperationCanceledException)
// ignore OperationCanceledException exceptions when terminating
if (!_cts.Token.IsCancellationRequested)
// make it an STA thread if message pumping is requested
if (staThread)
_thread.IsBackground = true;
this.Scheduler = tcs.Task.Result;
public override void Post(SendOrPostCallback d, object state)
_items.Add(new ActionItem(() => d(state)));
public override void Send(SendOrPostCallback d, object state)
//TODO: currently we don't support (and expect) synchronous callbacks
throw new NotImplementedException();
public override SynchronizationContext CreateCopy()
// more info:
return this;
public override int Wait(IntPtr[] waitHandles, bool waitAll, int millisecondsTimeout)
// this can pump if needed
return _waitHelper(waitHandles, waitAll, millisecondsTimeout);
public void Dispose()
// cleanup
if (_thread != null)
// request the cancellation and wait
_thread = null;
if (_items != null)
_items = null;
/// <summary>A handy wrapper around Task.Factory.StartNew</summary>
public Task Run(Action action, CancellationToken token)
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this.Scheduler);
/// <summary>A handy wrapper around Task.Factory.StartNew</summary>
public Task Run(Func<Task> action, CancellationToken token)
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this.Scheduler).Unwrap();
/// <summary>A handy wrapper around Task.Factory.StartNew</summary>
public Task<TResult> Run<TResult>(Func<Task<TResult>> action, CancellationToken token)
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this.Scheduler).Unwrap();
/// <summary>
/// Take queued task item, blocks if the queue is empty
/// </summary>
static internal IExecuteItem Take(BlockingCollection<IExecuteItem> items, CancellationToken token)
return items.Take(token);
/// <summary>
/// A simple container to store and execute an action via IExecuteItem.Execute
/// </summary>
class ActionItem : IExecuteItem
readonly Action _action;
protected ActionItem() { }
public ActionItem(Action action)
_action = action;
// IExecuteItem.Execute
public void Execute()
#region WaitHelpers
/// <summary>
/// Wait for handle(s) with message loop and timeout
/// </summary>
public static class WaitHelpers
/// <summary>
/// Wait for handles similar to SynchronizationContext.WaitHelper, but with pumping
/// </summary>
public static int WaitWithMessageLoop(IntPtr[] waitHandles, bool waitAll, int timeout)
// Don't use CoWaitForMultipleHandles, it has issues with message pumping
// more info:
const uint QS_MASK = NativeMethods.QS_ALLINPUT; // message queue status
uint count = (uint)waitHandles.Length;
if (waitHandles == null || count == 0)
throw new ArgumentNullException();
uint nativeResult; // result of the native wait API (WaitForMultipleObjects or MsgWaitForMultipleObjectsEx)
int managedResult; // result to return from WaitHelper
// wait for all?
if (waitAll && count > 1)
// more:, search for "mutex"
throw new NotSupportedException("WaitAll for multiple handles on a STA thread is not supported.");
// optimization: a quick check with a zero timeout
nativeResult = NativeMethods.WaitForMultipleObjects(count, waitHandles, bWaitAll: false, dwMilliseconds: 0);
if (IsNativeWaitSuccessful(count, nativeResult, out managedResult))
return managedResult;
// proceed to pumping
// track timeout if not infinite
Func<bool> hasTimedOut = () => false;
int remainingTimeout = timeout;
if (remainingTimeout != Timeout.Infinite)
int startTick = Environment.TickCount;
hasTimedOut = () =>
// Environment.TickCount wraps correctly even if runs continuously
int lapse = Environment.TickCount - startTick;
remainingTimeout = Math.Max(timeout - lapse, 0);
return remainingTimeout <= 0;
// the core loop
var msg = new NativeMethods.MSG();
while (true)
// MsgWaitForMultipleObjectsEx with MWMO_INPUTAVAILABLE returns,
// even if there's a message already seen but not removed in the message queue
nativeResult = NativeMethods.MsgWaitForMultipleObjectsEx(
count, waitHandles,
if (IsNativeWaitSuccessful(count, nativeResult, out managedResult) || WaitHandle.WaitTimeout == managedResult)
return managedResult;
// there is a message, pump and dispatch it
if (NativeMethods.PeekMessage(out msg, IntPtr.Zero, 0, 0, NativeMethods.PM_REMOVE))
NativeMethods.TranslateMessage(ref msg);
NativeMethods.DispatchMessage(ref msg);
if (hasTimedOut())
return WaitHandle.WaitTimeout;
/// <summary>
/// Analyze the result of the native wait API
/// </summary>
static bool IsNativeWaitSuccessful(uint count, uint nativeResult, out int managedResult)
if (nativeResult == (NativeMethods.WAIT_OBJECT_0 + count))
// a is message pending, only valid for MsgWaitForMultipleObjectsEx
managedResult = unchecked((int)nativeResult);
return false;
if (nativeResult >= NativeMethods.WAIT_OBJECT_0 && nativeResult < (NativeMethods.WAIT_OBJECT_0 + count))
managedResult = unchecked((int)(nativeResult - NativeMethods.WAIT_OBJECT_0));
return true;
if (nativeResult >= NativeMethods.WAIT_ABANDONED_0 && nativeResult < (NativeMethods.WAIT_ABANDONED_0 + count))
managedResult = unchecked((int)(nativeResult - NativeMethods.WAIT_ABANDONED_0));
throw new AbandonedMutexException();
if (nativeResult == NativeMethods.WAIT_TIMEOUT)
managedResult = WaitHandle.WaitTimeout;
return false;
throw new InvalidOperationException();
#region NativeMethods
/// <summary>
/// NativeMethods - p/invoke declarations
/// </summary>
internal static class NativeMethods
public const uint QS_KEY = 0x0001;
public const uint QS_MOUSEMOVE = 0x0002;
public const uint QS_MOUSEBUTTON = 0x0004;
public const uint QS_POSTMESSAGE = 0x0008;
public const uint QS_TIMER = 0x0010;
public const uint QS_PAINT = 0x0020;
public const uint QS_SENDMESSAGE = 0x0040;
public const uint QS_HOTKEY = 0x0080;
public const uint QS_ALLPOSTMESSAGE = 0x0100;
public const uint QS_RAWINPUT = 0x0400;
public const uint QS_INPUT = (QS_MOUSE | QS_KEY | QS_RAWINPUT);
public const uint MWMO_INPUTAVAILABLE = 0x0004;
public const uint MWMO_WAITALL = 0x0001;
public const uint PM_REMOVE = 0x0001;
public const uint PM_NOREMOVE = 0;
public const uint WAIT_TIMEOUT = 0x00000102;
public const uint WAIT_FAILED = 0xFFFFFFFF;
public const uint INFINITE = 0xFFFFFFFF;
public const uint WAIT_OBJECT_0 = 0;
public const uint WAIT_ABANDONED_0 = 0x00000080;
public const uint WAIT_IO_COMPLETION = 0x000000C0;
public struct MSG
public IntPtr hwnd;
public uint message;
public IntPtr wParam;
public IntPtr lParam;
public uint time;
public int x;
public int y;
[return: MarshalAs(UnmanagedType.Bool)]
public static extern bool PeekMessage(out MSG lpMsg, IntPtr hWnd, uint wMsgFilterMin, uint wMsgFilterMax, uint wRemoveMsg);
[return: MarshalAs(UnmanagedType.Bool)]
public static extern bool PostMessage(IntPtr hWnd, uint msg, IntPtr wParam, IntPtr lParam);
[return: MarshalAs(UnmanagedType.Bool)]
public static extern bool SendMessage(IntPtr hWnd, uint msg, IntPtr wParam, IntPtr lParam);
public static extern IntPtr DispatchMessage([In] ref MSG lpMsg);
[return: MarshalAs(UnmanagedType.Bool)]
public static extern bool TranslateMessage([In] ref MSG lpMsg);
[DllImport("ole32.dll", PreserveSig = false)]
public static extern void OleInitialize(IntPtr pvReserved);
[DllImport("ole32.dll", PreserveSig = true)]
public static extern void OleUninitialize();
public static extern uint GetTickCount();
public static extern uint GetQueueStatus(uint flags);
[DllImport("user32.dll", SetLastError = true)]
public static extern uint MsgWaitForMultipleObjectsEx(
uint nCount, IntPtr[] pHandles, uint dwMilliseconds, uint dwWakeMask, uint dwFlags);
[DllImport("kernel32.dll", SetLastError = true)]
public static extern uint WaitForMultipleObjects(
uint nCount, IntPtr[] lpHandles, bool bWaitAll, uint dwMilliseconds);
[DllImport("kernel32.dll", SetLastError = true)]
public static extern uint WaitForSingleObject(IntPtr hHandle, uint dwMilliseconds);
[DllImport("kernel32.dll", SetLastError = true)]
public static extern IntPtr CreateEvent(IntPtr lpEventAttributes, bool bManualReset, bool bInitialState, string lpName);
[DllImport("kernel32.dll", SetLastError = true)]
[return: MarshalAs(UnmanagedType.Bool)]
public static extern bool SetEvent(IntPtr hEvent);
[DllImport("kernel32.dll", SetLastError = true)]
[return: MarshalAs(UnmanagedType.Bool)]
public static extern bool CloseHandle(IntPtr hObject);
/// <summary>
/// ThreadAffinityDispatcher - custom multi-threaded dispatcher that runs on top of a
/// <see cref="ThreadAffinityTaskScheduler"/>
/// Relevant configuration options:
/// <code>
/// my-forkjoin-dispatcher {
/// type = ThreadAffinityDispatcher
/// throughput = 100
/// dedicated-thread-pool { #settings for Helios.DedicatedThreadPool
/// thread-count = 3 #number of threads
/// #deadlock-timeout = 3s #optional timeout for deadlock detection
/// threadtype = background #values can be "background" or "foreground"
/// }
/// }
/// </code>
/// </summary>
internal sealed class ThreadAffinityExecutor : ExecutorService
private ThreadAffinityTaskScheduler _dedicatedThreadPool;
private byte _shuttingDown = 0;
/// <summary>
/// TBD
/// </summary>
/// <param name="id">TBD</param>
/// <param name="poolSettings">TBD</param>
public ThreadAffinityExecutor(string id, AffinityThreadPoolSettings poolSettings) : base(id)
_dedicatedThreadPool = new ThreadAffinityTaskScheduler(poolSettings);
/// <summary>
/// TBD
/// </summary>
/// <param name="run">TBD</param>
/// <exception cref="RejectedExecutionException">
/// This exception is thrown if this method is called during the shutdown of this executor.
/// </exception>
public override void Execute(IRunnable run)
if (Volatile.Read(ref _shuttingDown) == 1)
throw new RejectedExecutionException("ForkJoinExecutor is shutting down");
new Task(()=>run.Run()).Start(_dedicatedThreadPool);
protected static readonly WaitCallback Executor = t => { ((IRunnable)t).Run(); };
/// <summary>
/// TBD
/// </summary>
public override void Shutdown()
// shut down the dedicated threadpool and null it out
Volatile.Write(ref _shuttingDown, 1);
_dedicatedThreadPool = null;
/// <summary>
/// </summary>
internal sealed class ThreadAffinityExecutorServiceFactory : ExecutorServiceConfigurator
/// <summary>
/// Used by <see cref="PinnedDispatcher"/>
/// </summary>
public static readonly Config SingleThreadDefault = ConfigurationFactory.ParseString(@"
thread-count = 1
threadtype = background
private readonly AffinityThreadPoolSettings _threadPoolConfiguration;
/// <summary>
/// Initializes a new instance of the <see cref="ForkJoinExecutorServiceFactory"/> class.
/// </summary>
/// <param name="config">TBD</param>
/// <param name="prerequisites">TBD</param>
/// <exception cref="ConfigurationException">
/// This exception is thrown if either 'dedicated-thread-pool' OR 'fork-join-executor' is not defined in <paramref name="config"/>.
/// </exception>
public ThreadAffinityExecutorServiceFactory(Config config, IDispatcherPrerequisites prerequisites)
: base(config, prerequisites)
_threadPoolConfiguration = ConfigureSettings(Config);
/// <summary>
/// TBD
/// </summary>
/// <param name="id">TBD</param>
/// <returns>TBD</returns>
public override ExecutorService Produce(string id)
return new ThreadAffinityExecutor(id, _threadPoolConfiguration);
private static AffinityThreadPoolSettings ConfigureSettings(Config config)
var dtp = config.GetConfig("dedicated-thread-pool");
if (dtp.IsNullOrEmpty()) throw new ConfigurationException(
$"must define section 'dedicated-thread-pool' for thread-affinity-executor {config.GetString("id", "unknown")}");
var settings = new AffinityThreadPoolSettings(
dtp.GetString("threadtype", ThreadType.Background.ToString())),
return settings;
