Skip to content

Instantly share code, notes, and snippets.

@co89757
Last active November 27, 2018 07:58
Show Gist options
  • Save co89757/6618509d5820aaa07a8767abeb07b6b4 to your computer and use it in GitHub Desktop.
Save co89757/6618509d5820aaa07a8767abeb07b6b4 to your computer and use it in GitHub Desktop.
a simple abstract class for P-C model processing
namespace DEMon
{
/// <summary>
/// Abstract class to faciliate the construction of processors that follow producer-consumer pattern.
/// Example: typical invokation sequence
/// ProducerConsumer.Init()
/// ProducerConsumer.Add(work_item)
/// ProducerConsumer.CompleteAndWait();
/// </summary>
/// <typeparam name="T">type of the work item</typeparam>
public abstract class ProducerComsumer<T>
{
private BlockingCollection<T> queue;
private ManualResetEvent initFinished;
private ManualResetEvent doneUpload;
private long totalItems;
public bool Done { get; private set; }
public const int QueueCapacity = 87352;
protected ProducerComsumer(int maxParallism = 64)
{
queue = new BlockingCollection<T>(QueueCapacity);
doneUpload = new ManualResetEvent(false);
initFinished = new ManualResetEvent(false);
Done = false;
Task.Run(() =>
{
initFinished.WaitOne();
Parallel.ForEach(queue.GetConsumingEnumerable(), new ParallelOptions() { MaxDegreeOfParallelism = maxParallism }, Process);
doneUpload.Set();
});
}
protected abstract void Process(T item);
public void Add(T item)
{
queue.Add(item);
}
protected abstract void Initialize();
public void Init()
{
Initialize();
initFinished.Set();
}
public void CompleteAndWait()
{
if (!queue.IsAddingCompleted)
{
queue.CompleteAdding();
}
doneUpload.WaitOne();
Done = true;
}
public void CompleteAdding()
{
queue.CompleteAdding();
}
public long QueueSize()
{
return queue.Count;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment