Skip to content

Instantly share code, notes, and snippets.

@naraga
Created September 18, 2012 22:08
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save naraga/3746240 to your computer and use it in GitHub Desktop.
Save naraga/3746240 to your computer and use it in GitHub Desktop.
Producer / Consumer queue optimized for IO bound operations
class Program
{
// PcQueue - demo
// Code copied from "C# 5.0 in a Nutshell: The Definitive Reference" by Joseph Albahari and Ben Albahari
// Modified by @borisbucha to efficiently support IO bound operations (simply adding TaskCreationOptions.LongRunning to consumer tasks)
// which sugests to scheduler to allocate dedicated thread to enqued items instead of poluting Threadpool.
static void Main()
{
Console.WriteLine("press any key when ready");
Console.ReadLine();
var pcq = new PcQueue(10);
for (int i = 0; i < 1000; i++)
{
int i1 = i;
pcq.Enqueue(() =>
{
Thread.Sleep(1000);
Console.WriteLine(i1);
});
}
Console.WriteLine("*************** press any key to finish ******************");
Console.ReadLine();
}
}
public class PcQueue : IDisposable
{
private readonly BlockingCollection<Task> _taskQ = new BlockingCollection<Task>();
public PcQueue(int workerCount)
{
// Create and start a separate Task for each consumer:
for (int i = 0; i < workerCount; i++)
Task.Factory.StartNew(Consume, TaskCreationOptions.LongRunning);
}
public Task Enqueue(Action action, CancellationToken cancelToken = default (CancellationToken))
{
var task = new Task(action, cancelToken);
_taskQ.Add(task);
return task;
}
public Task<TResult> Enqueue<TResult>(Func<TResult> func,
CancellationToken cancelToken = default (CancellationToken))
{
var task = new Task<TResult>(func, cancelToken);
_taskQ.Add(task);
return task;
}
private void Consume()
{
foreach (var task in _taskQ.GetConsumingEnumerable())
try
{
if (!task.IsCanceled) task.RunSynchronously();
}
catch (InvalidOperationException)// Race condition
{
}
}
public void Dispose()
{
_taskQ.CompleteAdding();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment