Skip to content

Instantly share code, notes, and snippets.

@Kikimora
Created May 5, 2020 21:58
Show Gist options
  • Save Kikimora/594a9d36fdb630bd8fad77eb26bbc925 to your computer and use it in GitHub Desktop.
Save Kikimora/594a9d36fdb630bd8fad77eb26bbc925 to your computer and use it in GitHub Desktop.
namespace Hmmm
{
class SequentialTaskScheduler : TaskScheduler
{
private readonly ConcurrentQueue<Task> _tasks = new ConcurrentQueue<Task>();
private readonly TaskScheduler _executor;
private int _isRunning;
public SequentialTaskScheduler() : this(Default)
{
}
public SequentialTaskScheduler(TaskScheduler executor)
{
_executor = executor;
}
protected override IEnumerable<Task> GetScheduledTasks()
{
return _tasks.ToArray();
}
protected override void QueueTask(Task task)
{
_tasks.Enqueue(task);
if (Interlocked.CompareExchange(ref _isRunning, 1, 0) == 0)
{
Task.Factory.StartNew(ProcessQueue, this, default, TaskCreationOptions.DenyChildAttach, _executor);
}
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
return false;
}
private static void ProcessQueue(object state)
{
var scheduler = (SequentialTaskScheduler) state;
if (ScheduleNextTask(scheduler)) return;
//No tasks was scheduled for execution. Indicate that processing is not running anymore by setting _isRunning to 0.
//Other thread(s) might have been adding items into the queue while below line executes.
Interlocked.Exchange(ref scheduler._isRunning, 0);
//If some other thread add item to the queue after this line then processing will be triggered.
//Deal with case when other thread added item while this thread have being updating _isRunning.
//Note that Interlocked.CompareExchange guarantees that either this method or QueueTask will run processing but not both.
if (scheduler._tasks.Count > 0 && Interlocked.CompareExchange(ref scheduler._isRunning, 1, 0) == 0)
{
ScheduleNextTask(scheduler);
}
}
private static bool ScheduleNextTask(SequentialTaskScheduler scheduler)
{
if (!scheduler._tasks.TryDequeue(out var task)) return false;
task.ContinueWith(x => ProcessQueue(scheduler));
task.Start(scheduler._executor);
return true;
}
}
public class Example
{
public static void ExampleUsage()
{
var scheduler = new SequentialTaskScheduler();
//Action execute one after another.
Task.Factory.StartNew(() => AsyncAction(), default, TaskCreationOptions.None, scheduler);
Task.Factory.StartNew(() => AsyncAction(), default, TaskCreationOptions.None, scheduler);
Task.Factory.StartNew(() => AsyncAction(), default, TaskCreationOptions.None, scheduler);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment