Skip to content

Instantly share code, notes, and snippets.

@davideicardi
Last active June 5, 2017 14:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save davideicardi/4815ac215061dfd32f8221a3331a303b to your computer and use it in GitHub Desktop.
Save davideicardi/4815ac215061dfd32f8221a3331a303b to your computer and use it in GitHub Desktop.
C# throttled worker with duplicate detection
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace AsyncThrottledSetSample
{
class Program
{
static void Main()
{
Console.WriteLine("'q' to exit ....");
using (var worker = new ThrottledWorker<string>((e) =>
{
Console.WriteLine("PROCESSING " + e);
return Task.FromResult(false);
}, 5000))
{
while (true)
{
var item = Console.ReadLine();
if (item == "q")
return;
if (!string.IsNullOrWhiteSpace(item))
worker.PushWork(item);
}
}
}
}
public sealed class ThrottledWorker<T> : IDisposable
{
private readonly NoDuplicatesConcurrentQueue _queue;
private readonly Task _task;
private readonly CancellationTokenSource _cancellation = new CancellationTokenSource();
public ThrottledWorker(Func<T, Task> doWorkAsync, int throttlingMs)
: this(doWorkAsync, throttlingMs, EqualityComparer<T>.Default)
{
}
public ThrottledWorker(Func<T, Task> doWorkAsync, int throttlingMs, IEqualityComparer<T> comparer)
{
_queue = new NoDuplicatesConcurrentQueue(comparer);
_task = StartAsync(doWorkAsync, throttlingMs);
}
public bool PushWork(T item)
{
return _queue.TryEnqueue(item);
}
private Task StartAsync(Func<T, Task> doWorkAsync, int throttlingMs)
{
var cancellationToken = _cancellation.Token;
return Task.Run(async () =>
{
while (!cancellationToken.IsCancellationRequested)
{
T item;
while (_queue.TryDequeue(out item) && !cancellationToken.IsCancellationRequested)
{
await doWorkAsync(item);
}
await Task.Delay(throttlingMs, cancellationToken);
}
}, cancellationToken);
}
public void Dispose()
{
if (_task == null)
return;
_cancellation.Cancel();
try
{
_task.Wait();
}
catch (AggregateException) // I should handle only AggregateException of OperationCanceledException
{
}
catch (OperationCanceledException)
{
}
}
private class NoDuplicatesConcurrentQueue
{
private readonly object _sync = new object();
private readonly Queue<T> _queue = new Queue<T>();
private readonly IEqualityComparer<T> _comparer;
public NoDuplicatesConcurrentQueue(IEqualityComparer<T> comparer)
{
_comparer = comparer;
}
public bool TryEnqueue(T item)
{
lock (_sync)
{
if (_queue.Contains(item, _comparer))
return false;
_queue.Enqueue(item);
return true;
}
}
public bool TryDequeue(out T item)
{
lock (_sync)
{
item = default(T);
if (_queue.Count > 0)
{
item = _queue.Dequeue();
}
return item != null;
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment