Skip to content

Instantly share code, notes, and snippets.

@yicone
Last active December 20, 2015 03:09
Show Gist options
  • Save yicone/6061531 to your computer and use it in GitHub Desktop.
Save yicone/6061531 to your computer and use it in GitHub Desktop.
AsyncActionManager (internal supply buffering)
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace BufferingWorker
{
public class AsyncActionManager<TActionParam> where TActionParam : class
{
private BlockingCollection<ActionBag<TActionParam>> _q1 = new BlockingCollection<ActionBag<TActionParam>>(5000);
public AsyncActionManager()
{
for (int i = 0; i < 5; i++)
{
new Thread(() =>
{
while (!_q1.IsCompleted)
{
var bag = _q1.Take();
try
{
bag.Action(bag.Param);
}
catch
{
// todo: _q2.Add(bag);
}
}
}).Start();
}
}
public void AddAction(Action<TActionParam> action, TActionParam param)
{
_q1.Add(new ActionBag<TActionParam>(action, param));
}
private class ActionBag<TParam> where TParam : class
{
public ActionBag(Action<TParam> action, TParam param)
{
this.Action = action;
this.Param = param;
}
public Action<TParam> Action { get; private set; }
public TParam Param { get; private set; }
}
}
public class Bar
{
public Bar(string name)
{
this.Name = name;
}
public string Name { get; private set; }
}
public class Foo
{
public Foo(string name)
{
this.Name = name;
}
public string Name { get; private set; }
}
internal class Program
{
private static AsyncActionManager<Bar> s_barManager = new AsyncActionManager<Bar>();
private static AsyncActionManager<Foo> s_fooManager = new AsyncActionManager<Foo>();
private static void Main(string[] args)
{
Action<Foo> printFoo = (foo) => { Console.WriteLine(string.Format("foo manager {0}. ThreadId: {1}", foo.Name, Thread.CurrentThread.ManagedThreadId)); };
Action<Bar> printBar = (bar) => { Console.WriteLine(string.Format("boo manager {0}. ThreadId: {1}", bar.Name, Thread.CurrentThread.ManagedThreadId)); };
s_fooManager.AddAction(printFoo, new Foo("first add"));
s_barManager.AddAction(printBar, new Bar("first add"));
for (int i = 1; i <= 100; i++)
{
s_barManager.AddAction(printBar, new Bar(i.ToString()));
}
for (int i = 1; i <= 100; i++)
{
s_fooManager.AddAction(printFoo, new Foo(i.ToString()));
}
Foo foo2 = new Foo("last add");
s_fooManager.AddAction(printFoo, foo2);
s_barManager.AddAction(printBar, new Bar("last add"));
Console.WriteLine("begin? ");
Console.ReadKey();
}
}
}
using System;
using System.Collections.Generic;
using System.Collections;
using System.Text;
using System.Threading;
using System.Configuration;
namespace Ctrip.AM.IPS.Frame.BaseLib.UpdateAsynManager
{
public static class UpdateAsynManager
{
/// <summary>
/// 保存线程引用
/// </summary>
private static List<System.Threading.Thread> _Thread = null;
/// <summary>
/// 暂停标志
/// </summary>
private static bool _stop = false;
/// <summary>
/// update队列
/// </summary>
private static Queue _updateQueue = new Queue(5000,10);
/// <summary>
/// update重做队列
/// </summary>
private static Queue _reUpdateQueue = new Queue(5000,10);
/// <summary>
/// 构造函数,完成相关初始化
/// </summary>
static UpdateAsynManager()
{
//初始化线程
LaunchUpdateManager();
}
//启动线程
internal static void LaunchUpdateManager()
{
_stop = false;
_Thread = new List<System.Threading.Thread>();
//启动update处理线程 * 3
for (int i = 0; i < 5; i++)
{
_Thread.Add(new Thread(new ThreadStart(LogThreadProcess)));
}
//update重做处理线程 * 1
_Thread.Add(new Thread(new ThreadStart(ReLogThreadProcess)));
//启动线程
for (int i = 0; i < _Thread.Count; i++)
{
_Thread[i].Start();
}
}
//暂停线程
internal static void PauseLogManager()
{
_stop = true;
//删除对线程List的引用
_Thread = null;
}
/// <summary>
/// 存入update队列
/// </summary>
/// <param name="logEntity">update实体</param>
internal static void UpdateQueuEnQueue(UpdateAsynEntity updateAsynEntity)
{
lock (_updateQueue)
{
_updateQueue.Enqueue(updateAsynEntity);
}
}
/// <summary>
/// 从update队列中取出update实体
/// </summary>
/// <returns></returns>
internal static UpdateAsynEntity UpdateQueuDeQueue()
{
lock (_updateQueue)
{
if (_updateQueue.Count > 0)
{
return (UpdateAsynEntity)_updateQueue.Dequeue();
}
else
{
return null;
}
}
}
/// <summary>
/// 存入update重做队列
/// </summary>
/// <param name="logEntity">update实体</param>
internal static void ReUpdateQueuEnQueue(UpdateAsynEntity updateAsynEntity)
{
lock (_reUpdateQueue)
{
_reUpdateQueue.Enqueue(updateAsynEntity);
}
}
/// <summary>
/// 将update实体由此函数传入,之后交由Manager类处理
/// </summary>
/// <param name="LogEntity">日志实体</param>
public static void WriteToLocal(UpdateAsynEntity updateAsynEntity, IUpdateAsynProcess updateAsynProcess)
{
if (updateAsynProcess == null)
{
throw new Exception("请提供update操作实例!!");
}
updateAsynEntity._updateAsynProcess = updateAsynProcess;
UpdateQueuEnQueue(updateAsynEntity);
}
public static void WriteToServer(UpdateAsynEntity updateAsynEntity, IUpdateAsynProcess updateAsynProcess)
{
}
/// <summary>
/// update线程
/// </summary>
private static void LogThreadProcess()
{
//暂停标志
while (!_stop)
{
try
{
//写入数据库
UpdateAsynEntity updateAsynEntity = UpdateQueuDeQueue();
if (updateAsynEntity != null)
{
try
{
updateAsynEntity._updateAsynProcess.Write(updateAsynEntity);
}
catch
{
//报错则进入重做队列
ReUpdateQueuEnQueue(updateAsynEntity);
}
}
}
catch
{
}
Thread.Sleep(5);
}
}
/// <summary>
/// update重做线程
/// </summary>
private static void ReLogThreadProcess()
{
//暂停标志
while (!_stop)
{
try
{
//锁定update重做队列
lock (_reUpdateQueue)
{
if (_reUpdateQueue.Count < 1000)
{
//如果重发队列中小于1000条记录,取出update实体重做
if (_reUpdateQueue.Count > 0)
{
UpdateAsynEntity updateAsynEntity = (UpdateAsynEntity)_reUpdateQueue.Dequeue();
try
{
updateAsynEntity._updateAsynProcess.Write(updateAsynEntity);
}
catch
{
//报错则再次进入重做队列
_reUpdateQueue.Enqueue(updateAsynEntity);
}
}
}
else
{
//如果重做队列中达到1000条记录,全部保存到Log4Net日志
int count = 0;
StringBuilder sb = new StringBuilder();
//string sendMessage;
while (_reUpdateQueue.Count > 0)
{
UpdateAsynEntity updateEntity = (UpdateAsynEntity)_reUpdateQueue.Dequeue();
if (count == 0) sb.AppendLine("");
sb.AppendLine("@@@" + updateEntity._updateAsynProcess.GetLog(updateEntity));
count++;
if (count == 100)
{
//send email
SendEmail(sb.ToString());
//WriteLog4Net(sb.ToString());
count = 0;
//清空sb
sb = new StringBuilder();
}
}
//不到100条的时候,如果有剩余也再做一次
if (count != 0)
{
SendEmail(sb.ToString());
//WriteLog4Net(sb.ToString());
}
}
}
}
catch
{
}
Thread.Sleep(100);
}
}
/// <summary>
/// 发送邮件
/// </summary>
private static void SendEmail(string emailContent)
{
//EmailEntity emailEntity = new EmailEntity();
//emailEntity.Recipient = ConfigurationManager.AppSettings["Recipient"];
//emailEntity.RecipientName = ConfigurationManager.AppSettings["RecipientName"];
////emailEntity.ScheduleTime
//emailEntity.ModuleType = "Monitor";
//emailEntity.BodyContent = "<title>" + emailContent + "</title>";
//emailEntity.Charset = "gb2312";
//emailEntity.ContentType = "text/html";
//emailEntity.DeadlineTime = DateTime.Now.AddYears(1);
//emailEntity.Importance = "0";
//emailEntity.SenderName = "携程旅行网";
//emailEntity.Sender = "monitor@ctrip.com";
//emailEntity.SourceID = 0;
//emailEntity.Uid = "";
//emailEntity.BusinessType = "Monitor";
//emailEntity.BodyTemplateID = 29;
//emailEntity.Orderid = "";
//emailEntity.Subject = "异步更新数据库异常";
//SendEmailUtility seu = new SendEmailUtility();
//seu.SendLogEmail(emailEntity);
}
}
}
using System;
namespace Ctrip.AM.IPS.Frame.CommLib
{
/// <summary>
/// 异步update实体
/// </summary>
public class UpdateAsynEntity
{
private object _updateEntity;
/// <summary>
/// 构造函数
/// </summary>
/// <param name="entity">update实体</param>
public UpdateAsynEntity(Object updateEntity)
{
_updateEntity = updateEntity;
}
/// <summary>
/// update实体
/// </summary>
public object UpdateEntity
{
get { return _updateEntity; }
}
/// <summary>
/// update处理逻辑
/// </summary>
internal IUpdateAsynProcess _updateAsynProcess;
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Ctrip.AM.IPS.Frame.BaseLib.Interface
{
public interface IUpdateAsynProcess
{
/// <summary>
/// udpate的操作函数
/// </summary>
/// <param name="updateEntity"></param>
void Write(UpdateAsynEntity updateEntity);
/// <summary>
/// 如果update失败,在替代操作时取得内容
/// </summary>
/// <param name="updateEntity"></param>
/// <returns></returns>
string GetLog(UpdateAsynEntity updateEntity);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment