Created
May 5, 2020 21:58
-
-
Save Kikimora/594a9d36fdb630bd8fad77eb26bbc925 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace Hmmm | |
{ | |
class SequentialTaskScheduler : TaskScheduler | |
{ | |
private readonly ConcurrentQueue<Task> _tasks = new ConcurrentQueue<Task>(); | |
private readonly TaskScheduler _executor; | |
private int _isRunning; | |
public SequentialTaskScheduler() : this(Default) | |
{ | |
} | |
public SequentialTaskScheduler(TaskScheduler executor) | |
{ | |
_executor = executor; | |
} | |
protected override IEnumerable<Task> GetScheduledTasks() | |
{ | |
return _tasks.ToArray(); | |
} | |
protected override void QueueTask(Task task) | |
{ | |
_tasks.Enqueue(task); | |
if (Interlocked.CompareExchange(ref _isRunning, 1, 0) == 0) | |
{ | |
Task.Factory.StartNew(ProcessQueue, this, default, TaskCreationOptions.DenyChildAttach, _executor); | |
} | |
} | |
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) | |
{ | |
return false; | |
} | |
private static void ProcessQueue(object state) | |
{ | |
var scheduler = (SequentialTaskScheduler) state; | |
if (ScheduleNextTask(scheduler)) return; | |
//No tasks was scheduled for execution. Indicate that processing is not running anymore by setting _isRunning to 0. | |
//Other thread(s) might have been adding items into the queue while below line executes. | |
Interlocked.Exchange(ref scheduler._isRunning, 0); | |
//If some other thread add item to the queue after this line then processing will be triggered. | |
//Deal with case when other thread added item while this thread have being updating _isRunning. | |
//Note that Interlocked.CompareExchange guarantees that either this method or QueueTask will run processing but not both. | |
if (scheduler._tasks.Count > 0 && Interlocked.CompareExchange(ref scheduler._isRunning, 1, 0) == 0) | |
{ | |
ScheduleNextTask(scheduler); | |
} | |
} | |
private static bool ScheduleNextTask(SequentialTaskScheduler scheduler) | |
{ | |
if (!scheduler._tasks.TryDequeue(out var task)) return false; | |
task.ContinueWith(x => ProcessQueue(scheduler)); | |
task.Start(scheduler._executor); | |
return true; | |
} | |
} | |
public class Example | |
{ | |
public static void ExampleUsage() | |
{ | |
var scheduler = new SequentialTaskScheduler(); | |
//Action execute one after another. | |
Task.Factory.StartNew(() => AsyncAction(), default, TaskCreationOptions.None, scheduler); | |
Task.Factory.StartNew(() => AsyncAction(), default, TaskCreationOptions.None, scheduler); | |
Task.Factory.StartNew(() => AsyncAction(), default, TaskCreationOptions.None, scheduler); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment