Skip to content

Instantly share code, notes, and snippets.

@valerysntx
Created April 26, 2016 13:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save valerysntx/1cb73ae17e71b8123b1cd4d4dd18c9d8 to your computer and use it in GitHub Desktop.
Save valerysntx/1cb73ae17e71b8123b1cd4d4dd18c9d8 to your computer and use it in GitHub Desktop.
ProducerConsumerQueue.cs
public class ProducerConsumerQueue : IDisposable
{
/// <summary>
/// thread blocking collection
/// </summary>
BlockingCollection<WorkItem> taskCollection = new BlockingCollection<WorkItem>();
/// <summary>
/// task continuations and
/// </summary>
class WorkItem
{
public readonly TaskCompletionSource<object> TaskSource;
public readonly Action Action;
public readonly CancellationToken? CancelToken;
public WorkItem( TaskCompletionSource<object> taskSource, Action action, CancellationToken? cancelToken = null)
{
TaskSource = taskSource;
Action = action;
CancelToken = cancelToken ?? CancellationToken.None;
}
}
private ProducerConsumerQueue( int workerCount )
{
// Create and start a separate Task for each consumer:
for (int i = 0; i < workerCount; i++) Task.Run(() => Consume());
}
public void Dispose() { taskCollection.CompleteAdding(); }
public Task EnqueueTask( Action action_in )
{
return EnqueueTask(action_in, null);
}
/// <summary>
/// background the messaging handler from intense work
/// </summary>
/// <param name="action"></param>
/// <param name="cancelToken"></param>
/// <returns></returns>
public Task EnqueueTask( Action action, CancellationToken? cancelToken )
{
TaskCompletionSource<object> tcs = new TaskCompletionSource<object>();
taskCollection.Add(new WorkItem(tcs, action, cancelToken));
return tcs.Task;
}
/// <summary>
/// the separate thread of tasks consumer
/// </summary>
void Consume()
{
foreach (WorkItem item in taskCollection.GetConsumingEnumerable())
{
if (item.CancelToken.HasValue && item.CancelToken.Value.IsCancellationRequested)
{
item.TaskSource.SetCanceled();
}
else
{
try
{
item.Action();
item.TaskSource.SetResult(null); // Indicate completion
}
catch (OperationCanceledException ex)
{
if (ex.CancellationToken == item.CancelToken)
{
item.TaskSource.SetCanceled();
}
else
{
item.TaskSource.SetException(ex);
}
}
// ReSharper disable once CatchAllClause
catch (Exception ex)
{
item.TaskSource.SetException(ex);
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment