Skip to content

Instantly share code, notes, and snippets.

@flysand7
Last active October 28, 2022 12:19
Show Gist options
  • Save flysand7/1877d5c56cf376640c651e9e8bfb2e65 to your computer and use it in GitHub Desktop.
Save flysand7/1877d5c56cf376640c651e9e8bfb2e65 to your computer and use it in GitHub Desktop.
Simple Work Queue in C#
public delegate void DoWorkFunc<WorkItem>(WorkItem q);
public class WorkQueue<WorkItem> {
Thread[] workers;
DoWorkFunc<WorkItem> do_work_func;
Queue<WorkItem> work_items;
Semaphore n_work_items;
Semaphore n_free_items;
CancellationTokenSource cts;
public WorkQueue(int n_workers, DoWorkFunc<WorkItem> do_work_func) {
this.do_work_func = do_work_func;
n_work_items = new Semaphore(0, n_workers);
n_free_items = new Semaphore(n_workers, n_workers);
work_items = new Queue<WorkItem>(n_workers);
workers = new Thread[n_workers-1];
cts = new CancellationTokenSource();
for(int i = 0; i != n_workers-1; ++i) {
workers[i] = new Thread(DoWork);
workers[i].Start();
workers[i].Name = "THREAD " + (i+1);
}
}
public void DoWork() {
for(;;) {
WorkItem item;
WaitHandle[] handles = new WaitHandle[] {
n_work_items,
cts.Token.WaitHandle
};
int wait_result = WaitHandle.WaitAny(handles);
if(wait_result == 1) {
return;
}
lock(work_items) {
item = work_items.Dequeue();
do_work_func(item);
n_free_items.Release();
}
}
}
public void AddWork(WorkItem work_item) {
n_free_items.WaitOne();
lock(work_items) {
work_items.Enqueue(work_item);
n_work_items.Release();
}
}
public void WaitForCompletion() {
for(;;) {
lock(work_items) {
if(work_items.Count == 0) {
cts.Cancel();
return;
}
else {
n_work_items.WaitOne();
WorkItem item = work_items.Dequeue();
do_work_func(item);
n_free_items.Release();
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment