Created
September 18, 2012 22:08
-
-
Save naraga/3746240 to your computer and use it in GitHub Desktop.
Producer / Consumer queue optimized for IO bound operations
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Program | |
{ | |
// PcQueue - demo | |
// Code copied from "C# 5.0 in a Nutshell: The Definitive Reference" by Joseph Albahari and Ben Albahari | |
// Modified by @borisbucha to efficiently support IO bound operations (simply adding TaskCreationOptions.LongRunning to consumer tasks) | |
// which sugests to scheduler to allocate dedicated thread to enqued items instead of poluting Threadpool. | |
static void Main() | |
{ | |
Console.WriteLine("press any key when ready"); | |
Console.ReadLine(); | |
var pcq = new PcQueue(10); | |
for (int i = 0; i < 1000; i++) | |
{ | |
int i1 = i; | |
pcq.Enqueue(() => | |
{ | |
Thread.Sleep(1000); | |
Console.WriteLine(i1); | |
}); | |
} | |
Console.WriteLine("*************** press any key to finish ******************"); | |
Console.ReadLine(); | |
} | |
} | |
public class PcQueue : IDisposable | |
{ | |
private readonly BlockingCollection<Task> _taskQ = new BlockingCollection<Task>(); | |
public PcQueue(int workerCount) | |
{ | |
// Create and start a separate Task for each consumer: | |
for (int i = 0; i < workerCount; i++) | |
Task.Factory.StartNew(Consume, TaskCreationOptions.LongRunning); | |
} | |
public Task Enqueue(Action action, CancellationToken cancelToken = default (CancellationToken)) | |
{ | |
var task = new Task(action, cancelToken); | |
_taskQ.Add(task); | |
return task; | |
} | |
public Task<TResult> Enqueue<TResult>(Func<TResult> func, | |
CancellationToken cancelToken = default (CancellationToken)) | |
{ | |
var task = new Task<TResult>(func, cancelToken); | |
_taskQ.Add(task); | |
return task; | |
} | |
private void Consume() | |
{ | |
foreach (var task in _taskQ.GetConsumingEnumerable()) | |
try | |
{ | |
if (!task.IsCanceled) task.RunSynchronously(); | |
} | |
catch (InvalidOperationException)// Race condition | |
{ | |
} | |
} | |
public void Dispose() | |
{ | |
_taskQ.CompleteAdding(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment