Skip to content

Instantly share code, notes, and snippets.

@radleta
Created November 18, 2019 13:50
Show Gist options
  • Save radleta/bbc3bbb7234c7003044f6969f4257ce6 to your computer and use it in GitHub Desktop.
Save radleta/bbc3bbb7234c7003044f6969f4257ce6 to your computer and use it in GitHub Desktop.
Cargo is a fire and forget processing queue that ensures items are run in payloads of a fixed size when possible based on currency and delay.
using System.Collections.Concurrent;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Nito.AsyncEx;
namespace RichardAdleta
{
/// <summary>
/// Cargo is a fire and forget processing queue that ensures items are run in payloads of a fixed
/// size when possible based on currency and delay.
/// </summary>
/// <typeparam name="TValue">The type of items to be processed.</typeparam>
public class Cargo<TValue> : IDisposable
{
private bool _disposedValue = false; // To detect redundant calls
private readonly CancellationTokenSource _disposeCancellationTokenSource = new CancellationTokenSource();
private readonly Func<List<TValue>, CancellationToken, Task> _cargo;
private readonly int _payload;
private readonly int _concurrency;
private readonly TimeSpan _timeout;
private readonly TimeSpan _delay;
private readonly bool _hasDelay;
private readonly ConcurrentQueue<TValue> _queue = new ConcurrentQueue<TValue>();
private readonly AsyncAutoResetEvent _moreThanPayloadEvent = new AsyncAutoResetEvent();
private readonly AsyncAutoResetEvent _taskCompletedEvent = new AsyncAutoResetEvent();
private int _activeTask;
public int Count => _queue.Count;
/// <summary>
/// Initializes a new instance of this class.
/// </summary>
/// <param name="cargo">The func to call to process the <see cref="TValue"/> instances.</param>
/// <param name="payload">The size of the payload to send to <c>cargo</c>. Must be equal to or greater than 1.</param>
/// <param name="concurrency">The total number of concurrent tasks running to process the queue at any one time.</param>
/// <param name="delay">The amount of delay to wait when the queue has less than the total amount in queue.</param>
/// <param name="timeout">The timeout of the <see cref="CancellationToken"/> passed to cargo on each execute.</param>
public Cargo(Func<List<TValue>, CancellationToken, Task> cargo, int payload, int concurrency, TimeSpan delay, TimeSpan timeout)
{
if (payload < 1)
{
throw new ArgumentOutOfRangeException(nameof(payload), payload, "Value cannot be less than one.");
}
if (concurrency < 1)
{
throw new ArgumentOutOfRangeException(nameof(concurrency), concurrency, "Value cannot be less than one.");
}
if (timeout <= TimeSpan.Zero)
{
throw new ArgumentOutOfRangeException(nameof(timeout), timeout, "Value cannot be equal or less than zero.");
}
_cargo = cargo ?? throw new ArgumentNullException(nameof(cargo));
_payload = payload;
_concurrency = concurrency;
_timeout = timeout;
_delay = delay;
_hasDelay = delay > TimeSpan.Zero;
}
private void ThrowObjectDisposedExceptionWhenDisposed()
{
if (_disposedValue) throw new ObjectDisposedException(GetType().FullName);
}
/// <summary>
/// Enqueues a single item.
/// </summary>
/// <param name="value">The item to enqueue.</param>
public void Enqueue(TValue value)
{
ThrowObjectDisposedExceptionWhenDisposed();
_queue.Enqueue(value);
// ensure a task is spawned to process the queue
EnsureTaskRunning();
}
/// <summary>
/// Enqueues multiple items.
/// </summary>
/// <param name="values">The values to enqueue.</param>
public void Enqueue(IEnumerable<TValue> values)
{
if (values is null)
{
throw new ArgumentNullException(nameof(values));
}
ThrowObjectDisposedExceptionWhenDisposed();
foreach (var value in values)
{
_queue.Enqueue(value);
}
// ensure a task is spawned to process the queue
EnsureTaskRunning();
}
/// <summary>
/// Determines whether or not the cargo is currently processing items.
/// </summary>
/// <returns><c>True</c> the cargo is processing items; otherwise, <c>false</c>.</returns>
public bool IsRunning()
{
ThrowObjectDisposedExceptionWhenDisposed();
return _activeTask > 0
|| _queue.Count > 0;
}
/// <summary>
/// Waits for the cargo to finish processing items.
/// </summary>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>Awaitable task.</returns>
public async Task<bool> WaitAsync(System.Threading.CancellationToken cancellationToken)
{
ThrowObjectDisposedExceptionWhenDisposed();
while (IsRunning())
{
// check for disposed state
if (cancellationToken.IsCancellationRequested)
return false;
// just check to make sure a task is running
EnsureTaskRunning();
// wait for a task to complete
await Task.WhenAny(
// wait 1s to recheck everything
Task.Delay(5000),
// wait for a task to complete
_taskCompletedEvent.WaitAsync(cancellationToken)
);
}
// success, we've waited till we weren't running
return true;
}
/// <summary>
/// Ensures a task is running to process the queue.
/// </summary>
private void EnsureTaskRunning()
{
// we want to signal any tasks
// that are being delayed for items
if (_hasDelay
&& _activeTask > 0
&& _payload < _queue.Count)
{
_moreThanPayloadEvent.Set();
}
// determine whether or not to spawn a task
if (!_disposedValue
&& 0 < _queue.Count)
{
// capture the current state of active task
var currentActiveTask = _activeTask;
// when its less than the concurrency
if (currentActiveTask < _concurrency)
{
// lets try incrementing it
var nextActiveTask = currentActiveTask + 1;
// fancy thread magic to exchange the value with the next one
if (Interlocked.CompareExchange(ref _activeTask, nextActiveTask, currentActiveTask) == currentActiveTask)
{
// when we won then we spawn another task
Task.Run(ExecuteAsync);
}
}
}
}
/// <summary>
/// The main execution method for cargo. It will loop until no more work to do.
/// </summary>
/// <returns>Awaitable.</returns>
private async Task ExecuteAsync()
{
try
{
// loop while this class is not disposed
while (!_disposedValue
&& _queue.Count > 0)
{
// delay between executes when its present
// except when we have more than one payload
if (_hasDelay
&& _queue.Count < _payload)
{
var delay = new System.Diagnostics.Stopwatch();
delay.Start();
while (!_disposedValue
&& _queue.Count < _payload
&& delay.Elapsed < _delay)
{
var remaining = (int)Math.Max(0, _delay.TotalMilliseconds - delay.ElapsedMilliseconds);
if (remaining > 0)
{
// go to sleep here waiting for
// either our time to be up
// or there is enough work to do
await Task.WhenAny(
Task.Delay(remaining),
_moreThanPayloadEvent.WaitAsync(_disposeCancellationTokenSource.Token)
);
}
}
// check to see whether we're disposed and bail
if (_disposedValue) return;
}
// loop and build a payload based on the items in the queue
var payload = new List<TValue>();
while (!_disposedValue
&& payload.Count < _payload
&& _queue.TryDequeue(out TValue value))
{
payload.Add(value);
}
// check to see whether we're disposed and bail
if (_disposedValue) return;
// exit loop once we have no more work
if (payload.Count == 0) break;
// do the work by processing the payload by passing it to the cargo func
try
{
// build a cts to do cancellation based on the requested timeout
using (var timeoutCts = new CancellationTokenSource(_timeout))
using (var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(timeoutCts.Token, _disposeCancellationTokenSource.Token))
{
await _cargo(payload, linkedCts.Token);
}
}
catch (Exception ex)
{
TelemetryClientManager.Default.TrackException(ex);
}
}
}
catch (Exception ex)
{
TelemetryClientManager.Default.TrackException(ex);
}
finally
{
// decrement the total number of active threads
Interlocked.Decrement(ref _activeTask);
// check to see whether we should spawn another task
// since we just stopped and things might have changed
// since we started stopping
EnsureTaskRunning();
// denote a task completed
_taskCompletedEvent.Set();
}
}
#region IDisposable Support
protected virtual void Dispose(bool disposing)
{
if (!_disposedValue)
{
_disposeCancellationTokenSource.Dispose();
_disposedValue = true;
}
}
~Cargo()
{
// Do not change this code. Put cleanup code in Dispose(bool disposing) above.
Dispose(false);
}
// This code added to correctly implement the disposable pattern.
public void Dispose()
{
// Do not change this code. Put cleanup code in Dispose(bool disposing) above.
Dispose(true);
GC.SuppressFinalize(this);
}
#endregion
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment