The goal of a ForkJoinDispatcher
is to provide a group of dedicated threads managed outside the CLR ThreadPool
that can be shared by many actors / callers to concurrently. In an ideal world, the CLR would provide us with a way of creating instances of ThreadPool
and we would happily wrap one of those inside the ForkJoinDispatcher
and call UnsafeQueueUserWorkItem(wc, null)
for each piece of asynchronous work we need done.
However, since this isn't the case - we have to build one. We need a way to manage multiple threads and have them cooperatively and efficiently peform queued work like the ThreadPool
would.
The ForkJoinDispatcher
is intended to be used for mission-critical system actors, such as the Remoting and Clustering actors in Akka.NET. In situations when lots of TPL Task
s are being run or the actor system is processing a large number of messages, it's quite easy for the ThreadPool
to become saturated and for these actors to experience long delays in delivering heartbeats and ACKs to eachother. As a result, our Remoting and Clustering modules often inadvertently mark other nodes as down despite the fact the underlying network connection is actually healthy.
The ultimate goal of the ForkJoinDispatcher
is to provide isolation guarantees for actors who are sensitive to "noisy neighbor" problems when it comes to the ThreadPool
.
There may very well be user-defined situations with characteristics similar to the Remoting and Clustering system actors in Akka.NET, therefore we want to expose this capability publicily via configuration rather than have it exist as a piece of internals to Akka.NET.
The ForkJoinDispatcher
must satisf the following requirements:
- There can be more than one
ForkJoinDispatcher
instance running at once, with different configurations, names, and threads; - The
ForkJoinDispatcher
can have a configurable level of concurrency - i.e. use between 2-3 threads, use exactly 10 threads, etc; - The
ForkJoinDispatcher
must be able to queue work to be executed in the future - it shouldn't need a thread to be available right this second in order to execute anAction
. - The
ForkJoinDispatcher
should be able to support schedulingTask
instances in addition to executing arbtiraryAction
delegates.
Here's an idea of my own version of this, taken from the DedicatedThreadFiber
I created as part of Helios:
https://github.com/helios-io/helios/blob/dev/src/Helios/Concurrency/Impl/DedicatedThreadPoolFiber.cs
namespace Helios.Concurrency.Impl
{
public class DedicatedThreadPoolFiber : IFiber
{
private readonly int _numThreads;
private List<Thread> _threads;
private readonly BlockingCollection<Action> _blockingCollection = new BlockingCollection<Action>(25000);
public DedicatedThreadPoolFiber(int numThreads)
: this(new BasicExecutor(), numThreads)
{
}
public DedicatedThreadPoolFiber(IExecutor executor, int numThreads)
{
Executor = executor ?? new BasicExecutor();
numThreads.NotNegative();
numThreads.NotLessThan(1);
_numThreads = numThreads;
SpawnThreads(numThreads);
Running = true;
}
protected void SpawnThreads(int threadCount)
{
_threads = new List<Thread>(threadCount);
for (var i = 0; i < threadCount; i++)
{
var thread = new Thread(_ =>
{
foreach (var task in _blockingCollection.GetConsumingEnumerable())
{
Executor.Execute(task);
if (!Executor.AcceptingJobs) return;
}
}) { IsBackground = true };
thread.Start();
_threads.Add(thread);
}
}
private volatile IExecutor _executor;
public IExecutor Executor { get { return _executor; } set { _executor = value; } }
public bool WasDisposed { get; private set; }
public bool Running { get; set; }
public void Add(Action op)
{
if (Running)
_blockingCollection.Add(op);
}
public void SwapExecutor(IExecutor executor)
{
//Shut down the previous executor gracefully (in case there's thread-contention)
Executor.GracefulShutdown(TimeSpan.FromSeconds(3));
Executor = executor;
}
public void Shutdown(TimeSpan gracePeriod)
{
Running = false;
Executor.Shutdown(gracePeriod);
}
public Task GracefulShutdown(TimeSpan gracePeriod)
{
Shutdown(gracePeriod);
return TaskRunner.Delay(gracePeriod);
}
public void Stop()
{
Executor.Shutdown();
}
#region IDisposable members
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
public void Dispose(bool isDisposing)
{
if (!WasDisposed)
{
if (isDisposing)
{
Shutdown(TimeSpan.Zero);
_threads = null;
}
}
WasDisposed = true;
}
public IFiber Clone()
{
return new DedicatedThreadPoolFiber(Executor.Clone(), _numThreads);
}
#endregion
}
}
Some of the Akka.NET folks don't like this because of the fixed size of the BlockingQueue
. But this meets the basic requirements (minus Task
scheduling) I listed earlier.
What's a better way to build this?
The main issue I see is that there will be a lot of contention on that blocking queue when multiple threads compete to pick work form it.
I used the blocking collection at the very early stages of Akka.NET or Pigeon. and the perf was far from impressive.
So I'd like to see something that can compete with the standard threadpool in terms of perf.
Regarding running tasks, that is not up to the threadpool itself, it is up to whatever taskscheduler is active to run the job on our specific threadpools.
e.g. the async await support we have does this, so whatever thread the actor runs on, any non IO completion task will be scheduled on the same thread as the actor.
So we need a specific taskscheduler for this if we are not using our existing one.