Skip to content

Instantly share code, notes, and snippets.

@NtFreX
Created January 4, 2018 09:15
Show Gist options
  • Save NtFreX/337151cd78330b7efa0acd2fea70d291 to your computer and use it in GitHub Desktop.
Save NtFreX/337151cd78330b7efa0acd2fea70d291 to your computer and use it in GitHub Desktop.
public class TaskQueue
{
public bool IsRunning => _longRunningTasks.Count > 0 || _semaphoreSlim.CurrentCount != _maxConcurentActions;
public event EventHandler<bool> IsRunningChanged;
private bool _oldIsRunningValue;
private readonly int _maxConcurentActions;
private readonly Queue<LongRunningTask> _longRunningTasks;
private readonly SemaphoreSlim _semaphoreSlim;
public TaskQueue(int maxConcurentActions)
{
_maxConcurentActions = maxConcurentActions;
_semaphoreSlim = new SemaphoreSlim(maxConcurentActions);
_longRunningTasks = new Queue<LongRunningTask>();
}
public async void Enqueue(Action action)
=> await EnqueueAsync(action);
public async void Enqueue(Action action, bool canCancel)
=> await EnqueueAsync(action, canCancel);
public async void Enqueue(Action<CancellationToken> action)
=> await EnqueueAsync(action);
public async void Enqueue(Action<CancellationToken> action, bool canCancel)
=> await EnqueueAsync(action, canCancel);
public async void Enqueue(Func<CancellationToken, Task> action)
=> await EnqueueAsync(action);
public async void Enqueue(Func<CancellationToken, Task> action, bool canCancel)
=> await EnqueueAsync(action, canCancel);
public async void Enqueue(Func<Task> action)
=> await EnqueueAsync(action);
public async void Enqueue(Func<Task> action, bool canCancel)
=> await EnqueueAsync(action, canCancel);
public async Task EnqueueAsync(Action action)
=> await EnqueueAsync(action, true);
public async Task EnqueueAsync(Action action, bool canCancel)
=> await EnqueueAsync(cancellationToken => action(), canCancel);
public async Task EnqueueAsync(Action<CancellationToken> action)
=> await EnqueueAsync(action, true);
public async Task EnqueueAsync(Action<CancellationToken> action, bool canCancel)
=> await EnqueueAsync(async cancellationToken =>
{
action(cancellationToken);
await Task.CompletedTask;
}, canCancel);
public async Task EnqueueAsync(Func<Task> action)
=> await EnqueueAsync(action, true);
public async Task EnqueueAsync(Func<Task> action, bool canCancel)
=> await EnqueueAsync(async cancelationToken => await action(), canCancel);
public async Task EnqueueAsync(Func<CancellationToken, Task> action)
=> await EnqueueAsync(action, true);
public async Task EnqueueAsync(Func<CancellationToken, Task> action, bool canCancel)
{
var id = Guid.NewGuid();
_longRunningTasks.Enqueue(new LongRunningTask
{
Id = id,
CanCancel = canCancel,
CancellationTokenSource = new CancellationTokenSource(),
Action = action
});
if (_oldIsRunningValue != true)
{
_oldIsRunningValue = true;
IsRunningChanged?.Invoke(this, true);
}
var nextId = Guid.Empty;
while (nextId != id)
{
await _semaphoreSlim.WaitAsync();
nextId = _longRunningTasks.Peek().Id;
}
var myTask = _longRunningTasks.Dequeue();
if (!myTask.CancellationTokenSource.IsCancellationRequested)
{
await myTask.Action(myTask.CancellationTokenSource.Token);
}
_semaphoreSlim.Release();
if (!IsRunning && _oldIsRunningValue)
{
_oldIsRunningValue = false;
IsRunningChanged?.Invoke(this, false);
}
}
public void CancleAllCancelables()
{
foreach (var item in _longRunningTasks)
{
if (item.CanCancel)
{
item.CancellationTokenSource.Cancel();
}
}
}
private class LongRunningTask
{
public Guid Id { get; set; }
public bool CanCancel { get; set; }
public CancellationTokenSource CancellationTokenSource { get; set; }
public Func<CancellationToken, Task> Action { get; set; }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment