Skip to content

Instantly share code, notes, and snippets.

@FisherMS
Created September 7, 2016 09:19
Show Gist options
  • Save FisherMS/c5ca01ed643b856647c18b9fbbca058f to your computer and use it in GitHub Desktop.
Save FisherMS/c5ca01ed643b856647c18b9fbbca058f to your computer and use it in GitHub Desktop.
很多时候都需要开一些线程队列来处理一些非常密集轻量的工作或对一些任务处理进行有效的限制,这个时候一般都会想到开启队列和线程来处理.下面分享一个无锁的队列作业调度器.
/// <summary>
/// 作业调度器
/// </summary>
public class Dispatch
{
/// <summary>
/// 构建作业调度器
/// </summary>
public Dispatch()
{
System.Threading.ThreadPool.QueueUserWorkItem(OnRun);
}
private System.Threading.Semaphore mSemaphore = new System.Threading.Semaphore(0, 1);
private int mCounter = 0;
private System.Collections.Concurrent.ConcurrentQueue<ITack> mQueue = new System.Collections.Concurrent.ConcurrentQueue<ITack>();
private ITack GetCmd()
{
ITack result = null;
mQueue.TryDequeue(out result);
if (result != null)
System.Threading.Interlocked.Decrement(ref mCounter);
return result;
}
/// <summary>
/// 添加作业
/// </summary>
/// <param name="cmd">作业</param>
public void Add(ITack cmd)
{
if (System.Threading.Interlocked.CompareExchange(ref mCounter, 1, -1) == -1)
{
mSemaphore.Release();
}
else
{
System.Threading.Interlocked.Increment(ref mCounter);
}
mQueue.Enqueue(cmd);
}
private void OnRun(object state)
{
while (true)
{
ITack cmd = GetCmd();
if (cmd == null)
{
if (System.Threading.Interlocked.CompareExchange(ref mCounter, -1, 0) == 0)
{
mSemaphore.WaitOne();
}
}
else
{
try
{
cmd.Execute();
Console.WriteLine(cmd);
}
catch
{
}
}
}
}
/// <summary>
/// 作业描述接口
/// </summary>
public interface ITack
{
void Execute();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment