Created
April 26, 2016 13:41
-
-
Save valerysntx/1cb73ae17e71b8123b1cd4d4dd18c9d8 to your computer and use it in GitHub Desktop.
ProducerConsumerQueue.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class ProducerConsumerQueue : IDisposable | |
{ | |
/// <summary> | |
/// thread blocking collection | |
/// </summary> | |
BlockingCollection<WorkItem> taskCollection = new BlockingCollection<WorkItem>(); | |
/// <summary> | |
/// task continuations and | |
/// </summary> | |
class WorkItem | |
{ | |
public readonly TaskCompletionSource<object> TaskSource; | |
public readonly Action Action; | |
public readonly CancellationToken? CancelToken; | |
public WorkItem( TaskCompletionSource<object> taskSource, Action action, CancellationToken? cancelToken = null) | |
{ | |
TaskSource = taskSource; | |
Action = action; | |
CancelToken = cancelToken ?? CancellationToken.None; | |
} | |
} | |
private ProducerConsumerQueue( int workerCount ) | |
{ | |
// Create and start a separate Task for each consumer: | |
for (int i = 0; i < workerCount; i++) Task.Run(() => Consume()); | |
} | |
public void Dispose() { taskCollection.CompleteAdding(); } | |
public Task EnqueueTask( Action action_in ) | |
{ | |
return EnqueueTask(action_in, null); | |
} | |
/// <summary> | |
/// background the messaging handler from intense work | |
/// </summary> | |
/// <param name="action"></param> | |
/// <param name="cancelToken"></param> | |
/// <returns></returns> | |
public Task EnqueueTask( Action action, CancellationToken? cancelToken ) | |
{ | |
TaskCompletionSource<object> tcs = new TaskCompletionSource<object>(); | |
taskCollection.Add(new WorkItem(tcs, action, cancelToken)); | |
return tcs.Task; | |
} | |
/// <summary> | |
/// the separate thread of tasks consumer | |
/// </summary> | |
void Consume() | |
{ | |
foreach (WorkItem item in taskCollection.GetConsumingEnumerable()) | |
{ | |
if (item.CancelToken.HasValue && item.CancelToken.Value.IsCancellationRequested) | |
{ | |
item.TaskSource.SetCanceled(); | |
} | |
else | |
{ | |
try | |
{ | |
item.Action(); | |
item.TaskSource.SetResult(null); // Indicate completion | |
} | |
catch (OperationCanceledException ex) | |
{ | |
if (ex.CancellationToken == item.CancelToken) | |
{ | |
item.TaskSource.SetCanceled(); | |
} | |
else | |
{ | |
item.TaskSource.SetException(ex); | |
} | |
} | |
// ReSharper disable once CatchAllClause | |
catch (Exception ex) | |
{ | |
item.TaskSource.SetException(ex); | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment