Created
January 2, 2019 21:33
-
-
Save arielmoraes/f0f913e377766c2588e5cbf5be3d727b to your computer and use it in GitHub Desktop.
Basic Queue Consumer with Threads
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class QueueConsumer<T> | |
{ | |
readonly BlockingCollection<QueueConsumerItem<T>> queue; | |
readonly int numberOfThreads; | |
public QueueConsumer(int numberOfThreads, int capacity) | |
{ | |
queue = new BlockingCollection<QueueConsumerItem<T>>(capacity); | |
this.numberOfThreads = numberOfThreads; | |
} | |
public void Add(QueueConsumerItem<T> item) | |
{ | |
queue.Add(item); | |
} | |
public void Start(CancellationToken cancellationToken) | |
{ | |
Task.Run(() => InnerStart(cancellationToken)); | |
} | |
public void Start() | |
{ | |
Task.Run(() => InnerStart(CancellationToken.None)); | |
} | |
private void InnerStart(CancellationToken cancellationToken) | |
{ | |
var tasks = new Task[numberOfThreads]; | |
for (int i = 0; i < numberOfThreads; i++) | |
{ | |
tasks[i] = Task.Run(() => Consume(cancellationToken)); | |
} | |
Task.WhenAll(tasks).GetAwaiter().GetResult(); | |
} | |
private void Consume(CancellationToken cancellationToken) | |
{ | |
while (!cancellationToken.IsCancellationRequested) | |
{ | |
try | |
{ | |
var item = queue.Take(); | |
item.Action(); | |
item.EndCallback(item.Key); | |
} | |
catch (InvalidOperationException) | |
{ | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class QueueConsumerItem<T> | |
{ | |
public QueueConsumerItem(T key, Action action) | |
{ | |
Key = key; | |
Action = action; | |
} | |
public QueueConsumerItem(T key, Action action, Action<T> endCallback) | |
: this(key, action) | |
{ | |
EndCallback = endCallback; | |
} | |
public T Key { get; private set; } | |
public Action Action { get; private set; } | |
public Action<T> EndCallback { get; private set; } | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment