Skip to content

Instantly share code, notes, and snippets.

Created December 14, 2019 14:51
Show Gist options
  • Save RobertBouillon/5eca4e1e49977aad96e95b7345477d4c to your computer and use it in GitHub Desktop.
Save RobertBouillon/5eca4e1e49977aad96e95b7345477d4c to your computer and use it in GitHub Desktop.
C# Worker Model
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.ComponentModel;
using System.Diagnostics;
namespace System.Threading
public abstract class Worker : System.Threading.Workers.IWorker
#region Fields
private Thread _workerThread;
private volatile bool _isStopping;
private volatile bool _isStarted;
private volatile bool _isWorking;
private TimeSpan _waitDelay = TimeSpan.FromMilliseconds(100);
private readonly string _name;
#region Properties
protected Thread WorkerThread => _workerThread;
public virtual string Name => _name;
public bool IsStopping
get { return _isStopping; }
protected set { _isStopping = value; }
public bool IsStarted
get { return _isStarted; }
protected set { _isStarted = value; }
public bool IsWorking
get { return _isWorking; }
protected set { _isWorking = value; }
public TimeSpan WaitDelay
get { return _waitDelay; }
protected set { _waitDelay = value; }
#region Constructors
public Worker(){}
protected Worker(string name)
#region Validation
if (name == null)
throw new ArgumentNullException("name");
_name = name;
#region Methods
protected virtual Thread CreateThread(String name)
Thread worker = new Thread(WorkerLoop);
worker.Name = _name;
worker.Priority = ThreadPriority.Normal;
worker.IsBackground = true;
return worker;
public bool Start()
#region Validation
//if (_internalInterval== TimeSpan.Zero)
// throw new InvalidOperationException("Interval must be greater than or equal to zero");
//if (_internalInterval.TotalMilliseconds <= 0)
// throw new InvalidOperationException("Interval must be a positive number");
if (_isStarted)
throw new InvalidOperationException("Cannot start worker: worker already started");
if (!OnStarting())
return false;
//Set here so no one can call Start again (save us having to declare and manage _isStarting)
_isStarted = true;
_workerThread = CreateThread(_name);
return true;
public bool Stop() => Stop(TimeSpan.FromSeconds(10), true);
public virtual bool Stop(TimeSpan timeout, bool kill)
#region Validation
if (!_isStarted)
throw new InvalidOperationException("Cannot stop worker: worker has not started.");
if (_isStopping)
throw new InvalidOperationException("Cannot stop worker: worker is stopping.");
if (!OnStopping())
return false;
_isStopping = true;
if (_isWorking && kill)
if (!_workerThread.Join(timeout))
if (kill)
OnError(new Exception(String.Format("Unable to stop worker: {0}", _name)));
_isStarted = false;
catch (Exception ex)
throw ex;
//Don't catch exception here. This happens on calling thread outside of the worker;
//OnError is in the context if the worker thread. We should bubble STOP exceptions
//to the calling thread.
_isStopping = false;
return true;
protected abstract void Work();
protected virtual void CancelWork() { }
protected virtual bool HasWork => true;
protected virtual void WaitForWork() => Thread.Sleep(_waitDelay);
protected virtual void WorkerLoop()
Stopwatch sw = new Stopwatch();
while (!_isStopping)
if (!HasWork)
_isWorking = true;
if (!OnWorking())
catch (Exception ex)
_isWorking = false;
#region Events
public event CancelEventHandler Starting;
protected bool OnStarting() => new CancelEventArgs(false).Invoke(OnStarting);
protected virtual void OnStarting(CancelEventArgs e) => Starting?.Invoke(this, e);
public event EventHandler Started;
protected void OnStarted() => OnStarted(EventArgs.Empty);
protected virtual void OnStarted(EventArgs e) => Started?.Invoke(this, e);
public event CancelEventHandler Stopping;
protected bool OnStopping() => new CancelEventArgs(false).Invoke(OnStopping);
protected virtual void OnStopping(CancelEventArgs e) => Stopping?.Invoke(this, e);
public event EventHandler Stopped;
protected void OnStopped() => OnStopped(EventArgs.Empty);
protected virtual void OnStopped(EventArgs e) => Stopped?.Invoke(this, e);
#region ErrorEventArgs Subclass
public class ErrorEventArgs : EventArgs
public Exception Exception { private set; get; }
internal ErrorEventArgs(Exception exception)
#region Validation
if (exception == null)
throw new ArgumentNullException("exception");
Exception = exception;
public event EventHandler<ErrorEventArgs> Error;
protected void OnError(Exception exception) => OnError(new ErrorEventArgs(exception));
protected virtual void OnError(ErrorEventArgs e) => Error?.Invoke(this, e);
#region WorkPerformedEventArgs Subclass
public class WorkPerformedEventArgs : EventArgs
public TimeSpan Duration { get; private set; }
internal WorkPerformedEventArgs(TimeSpan duration)
#region Validation
if (duration == null)
throw new ArgumentNullException("duration");
Duration = duration;
public event EventHandler<WorkPerformedEventArgs> Worked;
protected void OnWorked(TimeSpan duration) => OnWorked(new WorkPerformedEventArgs(duration));
protected virtual void OnWorked(WorkPerformedEventArgs e) => Worked?.Invoke(this, e);
public event CancelEventHandler Working;
protected bool OnWorking() => new CancelEventArgs(false).Invoke(OnWorking);
protected virtual void OnWorking(CancelEventArgs e) => Working?.Invoke(this, e);
namespace System.Threading.Workers
public abstract class EventWorker : Worker
protected abstract WaitHandle Handle { get; }
public EventWorker() { }
public EventWorker(string name) : base(name) { }
protected override void WaitForWork() => Handle.WaitOne(WaitDelay);
namespace System.Threading.Workers
public abstract class QueueWorker<T> : EventWorker, IEnumerable<T>
#region Fields
private IProducerConsumerCollection<T> _queue;
private AutoResetEvent _handle = new AutoResetEvent(false);
private T _item;
#region Constructors
public QueueWorker() => _queue = new ConcurrentQueue<T>();
public QueueWorker(IProducerConsumerCollection<T> queue)
#region Validation
if (queue == null)
throw new ArgumentNullException(nameof(queue));
_queue = queue;
public QueueWorker(string name) : this(name, new ConcurrentQueue<T>()){}
public QueueWorker(string name, IProducerConsumerCollection<T> queue) : base(name)
#region Validation
if (queue == null)
throw new ArgumentNullException(nameof(queue));
_queue = queue;
#region Methods
public virtual void Enqueue(T item)
// Just keep retrying
while (!_queue.TryAdd(item)) ;
#region Abstract Declarations
public abstract void Work(T item);
#region Overrides
protected override bool HasWork => _queue.Count > 0;
protected override WaitHandle Handle => _handle;
public void Flush()
while (_queue.Count > 0)
public override bool Stop(TimeSpan timeout, bool kill)
return base.Stop(timeout, kill);
protected override void Work()
if (_queue.TryTake(out T item))
Work(_item = item);
#region Events
#region WorkPerformedEventArgs Subclass
public class WorkPerformedEventArgs : EventArgs
#region Fields
private readonly TimeSpan _duration;
private readonly T _item;
#region Properties
public T Item
get { return _item; }
public TimeSpan Duration
get { return _duration; }
#region Constructors
internal WorkPerformedEventArgs(TimeSpan duration, T item)
#region Validation
if (duration == null)
throw new ArgumentNullException("duration");
if (item == null)
throw new ArgumentNullException("item");
_duration = duration;
_item = item;
public new event EventHandler<WorkPerformedEventArgs> Worked;
protected override void OnWorked(Worker.WorkPerformedEventArgs e)
Worked?.Invoke(this, new WorkPerformedEventArgs(e.Duration, _item));
public IEnumerator<T> GetEnumerator() => _queue.GetEnumerator();
IEnumerator IEnumerable.GetEnumerator() => _queue.GetEnumerator();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment