Skip to content

Instantly share code, notes, and snippets.

@AndyPook
Last active March 9, 2016 14:05
Show Gist options
  • Save AndyPook/cc91f90a20c807e39573 to your computer and use it in GitHub Desktop.
Save AndyPook/cc91f90a20c807e39573 to your computer and use it in GitHub Desktop.
Allows for separation of a producer (eg a network receiver) from a potentially more expensive consumer
public class QueueHandler<T>
{
public static QueueHandler<T> Start(Action<T> handler, CancellationToken? token = null, ParallelOptions options = null)
{
var q = new QueueHandler<T>(handler, token, options);
q.Start();
return q;
}
public QueueHandler(Action<T> handler, CancellationToken? token = null, ParallelOptions options = null)
{
if (handler == null)
throw new ArgumentNullException(nameof(handler));
this.handler = handler;
this.token = token ?? CancellationToken.None;
this.options = options ?? new ParallelOptions();
}
private readonly Action<T> handler;
private readonly CancellationToken token;
private readonly ParallelOptions options;
private readonly BlockingCollection<T> queue = new BlockingCollection<T>();
public void Start()
{
Task.Factory
.StartNew(Loop, token, TaskCreationOptions.LongRunning, TaskScheduler.Default)
.ContinueWith(t =>
{
if (t.Exception != null)
Trace.TraceError(t.Exception.Message);
Trace.TraceInformation("HandlerLoop finished");
}, token);
}
private void Loop()
{
var partitioner = Partitioner.Create(queue.GetConsumingEnumerable(token), EnumerablePartitionerOptions.None);
Parallel.ForEach(partitioner, options, handler);
}
public void Add(T item)
{
queue.Add(item, token);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment