Last active
June 5, 2017 14:33
-
-
Save davideicardi/4815ac215061dfd32f8221a3331a303b to your computer and use it in GitHub Desktop.
C# throttled worker with duplicate detection
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace AsyncThrottledSetSample | |
{ | |
class Program | |
{ | |
static void Main() | |
{ | |
Console.WriteLine("'q' to exit ...."); | |
using (var worker = new ThrottledWorker<string>((e) => | |
{ | |
Console.WriteLine("PROCESSING " + e); | |
return Task.FromResult(false); | |
}, 5000)) | |
{ | |
while (true) | |
{ | |
var item = Console.ReadLine(); | |
if (item == "q") | |
return; | |
if (!string.IsNullOrWhiteSpace(item)) | |
worker.PushWork(item); | |
} | |
} | |
} | |
} | |
public sealed class ThrottledWorker<T> : IDisposable | |
{ | |
private readonly NoDuplicatesConcurrentQueue _queue; | |
private readonly Task _task; | |
private readonly CancellationTokenSource _cancellation = new CancellationTokenSource(); | |
public ThrottledWorker(Func<T, Task> doWorkAsync, int throttlingMs) | |
: this(doWorkAsync, throttlingMs, EqualityComparer<T>.Default) | |
{ | |
} | |
public ThrottledWorker(Func<T, Task> doWorkAsync, int throttlingMs, IEqualityComparer<T> comparer) | |
{ | |
_queue = new NoDuplicatesConcurrentQueue(comparer); | |
_task = StartAsync(doWorkAsync, throttlingMs); | |
} | |
public bool PushWork(T item) | |
{ | |
return _queue.TryEnqueue(item); | |
} | |
private Task StartAsync(Func<T, Task> doWorkAsync, int throttlingMs) | |
{ | |
var cancellationToken = _cancellation.Token; | |
return Task.Run(async () => | |
{ | |
while (!cancellationToken.IsCancellationRequested) | |
{ | |
T item; | |
while (_queue.TryDequeue(out item) && !cancellationToken.IsCancellationRequested) | |
{ | |
await doWorkAsync(item); | |
} | |
await Task.Delay(throttlingMs, cancellationToken); | |
} | |
}, cancellationToken); | |
} | |
public void Dispose() | |
{ | |
if (_task == null) | |
return; | |
_cancellation.Cancel(); | |
try | |
{ | |
_task.Wait(); | |
} | |
catch (AggregateException) // I should handle only AggregateException of OperationCanceledException | |
{ | |
} | |
catch (OperationCanceledException) | |
{ | |
} | |
} | |
private class NoDuplicatesConcurrentQueue | |
{ | |
private readonly object _sync = new object(); | |
private readonly Queue<T> _queue = new Queue<T>(); | |
private readonly IEqualityComparer<T> _comparer; | |
public NoDuplicatesConcurrentQueue(IEqualityComparer<T> comparer) | |
{ | |
_comparer = comparer; | |
} | |
public bool TryEnqueue(T item) | |
{ | |
lock (_sync) | |
{ | |
if (_queue.Contains(item, _comparer)) | |
return false; | |
_queue.Enqueue(item); | |
return true; | |
} | |
} | |
public bool TryDequeue(out T item) | |
{ | |
lock (_sync) | |
{ | |
item = default(T); | |
if (_queue.Count > 0) | |
{ | |
item = _queue.Dequeue(); | |
} | |
return item != null; | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment