Skip to content

Instantly share code, notes, and snippets.

@Aaronontheweb
Last active August 29, 2015 14:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Aaronontheweb/97bd3dcf0cd4f425f464 to your computer and use it in GitHub Desktop.
Save Aaronontheweb/97bd3dcf0cd4f425f464 to your computer and use it in GitHub Desktop.
ForkJoinDispatcher / DedicatedThreadFiber Spec

ForkJoinDispatcher

Abstract

The goal of a ForkJoinDispatcher is to provide a group of dedicated threads managed outside the CLR ThreadPool that can be shared by many actors / callers to concurrently. In an ideal world, the CLR would provide us with a way of creating instances of ThreadPool and we would happily wrap one of those inside the ForkJoinDispatcher and call UnsafeQueueUserWorkItem(wc, null) for each piece of asynchronous work we need done.

However, since this isn't the case - we have to build one. We need a way to manage multiple threads and have them cooperatively and efficiently peform queued work like the ThreadPool would.

Why ForkJoinDispatcher is Necessary

The ForkJoinDispatcher is intended to be used for mission-critical system actors, such as the Remoting and Clustering actors in Akka.NET. In situations when lots of TPL Tasks are being run or the actor system is processing a large number of messages, it's quite easy for the ThreadPool to become saturated and for these actors to experience long delays in delivering heartbeats and ACKs to eachother. As a result, our Remoting and Clustering modules often inadvertently mark other nodes as down despite the fact the underlying network connection is actually healthy.

The ultimate goal of the ForkJoinDispatcher is to provide isolation guarantees for actors who are sensitive to "noisy neighbor" problems when it comes to the ThreadPool.

There may very well be user-defined situations with characteristics similar to the Remoting and Clustering system actors in Akka.NET, therefore we want to expose this capability publicily via configuration rather than have it exist as a piece of internals to Akka.NET.

Functional Requirements

The ForkJoinDispatcher must satisf the following requirements:

  1. There can be more than one ForkJoinDispatcher instance running at once, with different configurations, names, and threads;
  2. The ForkJoinDispatcher can have a configurable level of concurrency - i.e. use between 2-3 threads, use exactly 10 threads, etc;
  3. The ForkJoinDispatcher must be able to queue work to be executed in the future - it shouldn't need a thread to be available right this second in order to execute an Action.
  4. The ForkJoinDispatcher should be able to support scheduling Task instances in addition to executing arbtirary Action delegates.

Naive Implementation

Here's an idea of my own version of this, taken from the DedicatedThreadFiber I created as part of Helios:

https://github.com/helios-io/helios/blob/dev/src/Helios/Concurrency/Impl/DedicatedThreadPoolFiber.cs

namespace Helios.Concurrency.Impl
{
    public class DedicatedThreadPoolFiber : IFiber
    {
        private readonly int _numThreads;
        private List<Thread> _threads;

        private readonly BlockingCollection<Action> _blockingCollection = new BlockingCollection<Action>(25000);

        public DedicatedThreadPoolFiber(int numThreads)
            : this(new BasicExecutor(), numThreads)
        {
        }

        public DedicatedThreadPoolFiber(IExecutor executor, int numThreads)
        {
            Executor = executor ?? new BasicExecutor();
            numThreads.NotNegative();
            numThreads.NotLessThan(1);
            _numThreads = numThreads;
            SpawnThreads(numThreads);
            Running = true;
        }

        protected void SpawnThreads(int threadCount)
        {
            _threads = new List<Thread>(threadCount);
            for (var i = 0; i < threadCount; i++)
            {

                var thread = new Thread(_ =>
                {
                    foreach (var task in _blockingCollection.GetConsumingEnumerable())
                    {
                        Executor.Execute(task);
                        if (!Executor.AcceptingJobs) return;
                    }
                }) { IsBackground = true };
                thread.Start();
                _threads.Add(thread);
            }

        }

        private volatile IExecutor _executor;
        public IExecutor Executor { get { return _executor; } set { _executor = value; } }
        public bool WasDisposed { get; private set; }

        public bool Running { get; set; }

        public void Add(Action op)
        {
            if (Running)
                _blockingCollection.Add(op);
        }

        public void SwapExecutor(IExecutor executor)
        {
            //Shut down the previous executor gracefully (in case there's thread-contention)
            Executor.GracefulShutdown(TimeSpan.FromSeconds(3));
            Executor = executor;
        }

        public void Shutdown(TimeSpan gracePeriod)
        {
            Running = false;
            Executor.Shutdown(gracePeriod);
        }

        public Task GracefulShutdown(TimeSpan gracePeriod)
        {
            Shutdown(gracePeriod);
            return TaskRunner.Delay(gracePeriod);
        }

        public void Stop()
        {
            Executor.Shutdown();
        }

        #region IDisposable members

        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        public void Dispose(bool isDisposing)
        {
            if (!WasDisposed)
            {
                if (isDisposing)
                {
                    Shutdown(TimeSpan.Zero);
                    _threads = null;
                }
            }

            WasDisposed = true;
        }

        public IFiber Clone()
        {
            return new DedicatedThreadPoolFiber(Executor.Clone(), _numThreads);
        }

        #endregion
    }
}

Some of the Akka.NET folks don't like this because of the fixed size of the BlockingQueue. But this meets the basic requirements (minus Task scheduling) I listed earlier.

What's a better way to build this?

@rogeralsing
Copy link

The main issue I see is that there will be a lot of contention on that blocking queue when multiple threads compete to pick work form it.
I used the blocking collection at the very early stages of Akka.NET or Pigeon. and the perf was far from impressive.
So I'd like to see something that can compete with the standard threadpool in terms of perf.

Regarding running tasks, that is not up to the threadpool itself, it is up to whatever taskscheduler is active to run the job on our specific threadpools.
e.g. the async await support we have does this, so whatever thread the actor runs on, any non IO completion task will be scheduled on the same thread as the actor.
So we need a specific taskscheduler for this if we are not using our existing one.

@Aaronontheweb
Copy link
Author

Regarding running tasks, that is not up to the threadpool itself, it is up to whatever taskscheduler is active to run the job on our specific threadpools.

I'm suggesting that we have a way of applying a TaskScheduler on top of the thread pool running inside a ForkJoinDispatcher specifically for the purpose of Context.Schedule - having those periodic heartbeat tasks running on the main ThreadPool still means that there's a chance of two nodes falsely marking each other as down.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment