Skip to content

Instantly share code, notes, and snippets.

@arielmoraes
Created January 2, 2019 21:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save arielmoraes/f0f913e377766c2588e5cbf5be3d727b to your computer and use it in GitHub Desktop.
Save arielmoraes/f0f913e377766c2588e5cbf5be3d727b to your computer and use it in GitHub Desktop.
Basic Queue Consumer with Threads
public class QueueConsumer<T>
{
readonly BlockingCollection<QueueConsumerItem<T>> queue;
readonly int numberOfThreads;
public QueueConsumer(int numberOfThreads, int capacity)
{
queue = new BlockingCollection<QueueConsumerItem<T>>(capacity);
this.numberOfThreads = numberOfThreads;
}
public void Add(QueueConsumerItem<T> item)
{
queue.Add(item);
}
public void Start(CancellationToken cancellationToken)
{
Task.Run(() => InnerStart(cancellationToken));
}
public void Start()
{
Task.Run(() => InnerStart(CancellationToken.None));
}
private void InnerStart(CancellationToken cancellationToken)
{
var tasks = new Task[numberOfThreads];
for (int i = 0; i < numberOfThreads; i++)
{
tasks[i] = Task.Run(() => Consume(cancellationToken));
}
Task.WhenAll(tasks).GetAwaiter().GetResult();
}
private void Consume(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
var item = queue.Take();
item.Action();
item.EndCallback(item.Key);
}
catch (InvalidOperationException)
{
}
}
}
}
public class QueueConsumerItem<T>
{
public QueueConsumerItem(T key, Action action)
{
Key = key;
Action = action;
}
public QueueConsumerItem(T key, Action action, Action<T> endCallback)
: this(key, action)
{
EndCallback = endCallback;
}
public T Key { get; private set; }
public Action Action { get; private set; }
public Action<T> EndCallback { get; private set; }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment