Last active
December 20, 2015 03:09
-
-
Save yicone/6061531 to your computer and use it in GitHub Desktop.
AsyncActionManager (internal supply buffering)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace BufferingWorker | |
{ | |
public class AsyncActionManager<TActionParam> where TActionParam : class | |
{ | |
private BlockingCollection<ActionBag<TActionParam>> _q1 = new BlockingCollection<ActionBag<TActionParam>>(5000); | |
public AsyncActionManager() | |
{ | |
for (int i = 0; i < 5; i++) | |
{ | |
new Thread(() => | |
{ | |
while (!_q1.IsCompleted) | |
{ | |
var bag = _q1.Take(); | |
try | |
{ | |
bag.Action(bag.Param); | |
} | |
catch | |
{ | |
// todo: _q2.Add(bag); | |
} | |
} | |
}).Start(); | |
} | |
} | |
public void AddAction(Action<TActionParam> action, TActionParam param) | |
{ | |
_q1.Add(new ActionBag<TActionParam>(action, param)); | |
} | |
private class ActionBag<TParam> where TParam : class | |
{ | |
public ActionBag(Action<TParam> action, TParam param) | |
{ | |
this.Action = action; | |
this.Param = param; | |
} | |
public Action<TParam> Action { get; private set; } | |
public TParam Param { get; private set; } | |
} | |
} | |
public class Bar | |
{ | |
public Bar(string name) | |
{ | |
this.Name = name; | |
} | |
public string Name { get; private set; } | |
} | |
public class Foo | |
{ | |
public Foo(string name) | |
{ | |
this.Name = name; | |
} | |
public string Name { get; private set; } | |
} | |
internal class Program | |
{ | |
private static AsyncActionManager<Bar> s_barManager = new AsyncActionManager<Bar>(); | |
private static AsyncActionManager<Foo> s_fooManager = new AsyncActionManager<Foo>(); | |
private static void Main(string[] args) | |
{ | |
Action<Foo> printFoo = (foo) => { Console.WriteLine(string.Format("foo manager {0}. ThreadId: {1}", foo.Name, Thread.CurrentThread.ManagedThreadId)); }; | |
Action<Bar> printBar = (bar) => { Console.WriteLine(string.Format("boo manager {0}. ThreadId: {1}", bar.Name, Thread.CurrentThread.ManagedThreadId)); }; | |
s_fooManager.AddAction(printFoo, new Foo("first add")); | |
s_barManager.AddAction(printBar, new Bar("first add")); | |
for (int i = 1; i <= 100; i++) | |
{ | |
s_barManager.AddAction(printBar, new Bar(i.ToString())); | |
} | |
for (int i = 1; i <= 100; i++) | |
{ | |
s_fooManager.AddAction(printFoo, new Foo(i.ToString())); | |
} | |
Foo foo2 = new Foo("last add"); | |
s_fooManager.AddAction(printFoo, foo2); | |
s_barManager.AddAction(printBar, new Bar("last add")); | |
Console.WriteLine("begin? "); | |
Console.ReadKey(); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Collections; | |
using System.Text; | |
using System.Threading; | |
using System.Configuration; | |
namespace Ctrip.AM.IPS.Frame.BaseLib.UpdateAsynManager | |
{ | |
public static class UpdateAsynManager | |
{ | |
/// <summary> | |
/// 保存线程引用 | |
/// </summary> | |
private static List<System.Threading.Thread> _Thread = null; | |
/// <summary> | |
/// 暂停标志 | |
/// </summary> | |
private static bool _stop = false; | |
/// <summary> | |
/// update队列 | |
/// </summary> | |
private static Queue _updateQueue = new Queue(5000,10); | |
/// <summary> | |
/// update重做队列 | |
/// </summary> | |
private static Queue _reUpdateQueue = new Queue(5000,10); | |
/// <summary> | |
/// 构造函数,完成相关初始化 | |
/// </summary> | |
static UpdateAsynManager() | |
{ | |
//初始化线程 | |
LaunchUpdateManager(); | |
} | |
//启动线程 | |
internal static void LaunchUpdateManager() | |
{ | |
_stop = false; | |
_Thread = new List<System.Threading.Thread>(); | |
//启动update处理线程 * 3 | |
for (int i = 0; i < 5; i++) | |
{ | |
_Thread.Add(new Thread(new ThreadStart(LogThreadProcess))); | |
} | |
//update重做处理线程 * 1 | |
_Thread.Add(new Thread(new ThreadStart(ReLogThreadProcess))); | |
//启动线程 | |
for (int i = 0; i < _Thread.Count; i++) | |
{ | |
_Thread[i].Start(); | |
} | |
} | |
//暂停线程 | |
internal static void PauseLogManager() | |
{ | |
_stop = true; | |
//删除对线程List的引用 | |
_Thread = null; | |
} | |
/// <summary> | |
/// 存入update队列 | |
/// </summary> | |
/// <param name="logEntity">update实体</param> | |
internal static void UpdateQueuEnQueue(UpdateAsynEntity updateAsynEntity) | |
{ | |
lock (_updateQueue) | |
{ | |
_updateQueue.Enqueue(updateAsynEntity); | |
} | |
} | |
/// <summary> | |
/// 从update队列中取出update实体 | |
/// </summary> | |
/// <returns></returns> | |
internal static UpdateAsynEntity UpdateQueuDeQueue() | |
{ | |
lock (_updateQueue) | |
{ | |
if (_updateQueue.Count > 0) | |
{ | |
return (UpdateAsynEntity)_updateQueue.Dequeue(); | |
} | |
else | |
{ | |
return null; | |
} | |
} | |
} | |
/// <summary> | |
/// 存入update重做队列 | |
/// </summary> | |
/// <param name="logEntity">update实体</param> | |
internal static void ReUpdateQueuEnQueue(UpdateAsynEntity updateAsynEntity) | |
{ | |
lock (_reUpdateQueue) | |
{ | |
_reUpdateQueue.Enqueue(updateAsynEntity); | |
} | |
} | |
/// <summary> | |
/// 将update实体由此函数传入,之后交由Manager类处理 | |
/// </summary> | |
/// <param name="LogEntity">日志实体</param> | |
public static void WriteToLocal(UpdateAsynEntity updateAsynEntity, IUpdateAsynProcess updateAsynProcess) | |
{ | |
if (updateAsynProcess == null) | |
{ | |
throw new Exception("请提供update操作实例!!"); | |
} | |
updateAsynEntity._updateAsynProcess = updateAsynProcess; | |
UpdateQueuEnQueue(updateAsynEntity); | |
} | |
public static void WriteToServer(UpdateAsynEntity updateAsynEntity, IUpdateAsynProcess updateAsynProcess) | |
{ | |
} | |
/// <summary> | |
/// update线程 | |
/// </summary> | |
private static void LogThreadProcess() | |
{ | |
//暂停标志 | |
while (!_stop) | |
{ | |
try | |
{ | |
//写入数据库 | |
UpdateAsynEntity updateAsynEntity = UpdateQueuDeQueue(); | |
if (updateAsynEntity != null) | |
{ | |
try | |
{ | |
updateAsynEntity._updateAsynProcess.Write(updateAsynEntity); | |
} | |
catch | |
{ | |
//报错则进入重做队列 | |
ReUpdateQueuEnQueue(updateAsynEntity); | |
} | |
} | |
} | |
catch | |
{ | |
} | |
Thread.Sleep(5); | |
} | |
} | |
/// <summary> | |
/// update重做线程 | |
/// </summary> | |
private static void ReLogThreadProcess() | |
{ | |
//暂停标志 | |
while (!_stop) | |
{ | |
try | |
{ | |
//锁定update重做队列 | |
lock (_reUpdateQueue) | |
{ | |
if (_reUpdateQueue.Count < 1000) | |
{ | |
//如果重发队列中小于1000条记录,取出update实体重做 | |
if (_reUpdateQueue.Count > 0) | |
{ | |
UpdateAsynEntity updateAsynEntity = (UpdateAsynEntity)_reUpdateQueue.Dequeue(); | |
try | |
{ | |
updateAsynEntity._updateAsynProcess.Write(updateAsynEntity); | |
} | |
catch | |
{ | |
//报错则再次进入重做队列 | |
_reUpdateQueue.Enqueue(updateAsynEntity); | |
} | |
} | |
} | |
else | |
{ | |
//如果重做队列中达到1000条记录,全部保存到Log4Net日志 | |
int count = 0; | |
StringBuilder sb = new StringBuilder(); | |
//string sendMessage; | |
while (_reUpdateQueue.Count > 0) | |
{ | |
UpdateAsynEntity updateEntity = (UpdateAsynEntity)_reUpdateQueue.Dequeue(); | |
if (count == 0) sb.AppendLine(""); | |
sb.AppendLine("@@@" + updateEntity._updateAsynProcess.GetLog(updateEntity)); | |
count++; | |
if (count == 100) | |
{ | |
//send email | |
SendEmail(sb.ToString()); | |
//WriteLog4Net(sb.ToString()); | |
count = 0; | |
//清空sb | |
sb = new StringBuilder(); | |
} | |
} | |
//不到100条的时候,如果有剩余也再做一次 | |
if (count != 0) | |
{ | |
SendEmail(sb.ToString()); | |
//WriteLog4Net(sb.ToString()); | |
} | |
} | |
} | |
} | |
catch | |
{ | |
} | |
Thread.Sleep(100); | |
} | |
} | |
/// <summary> | |
/// 发送邮件 | |
/// </summary> | |
private static void SendEmail(string emailContent) | |
{ | |
//EmailEntity emailEntity = new EmailEntity(); | |
//emailEntity.Recipient = ConfigurationManager.AppSettings["Recipient"]; | |
//emailEntity.RecipientName = ConfigurationManager.AppSettings["RecipientName"]; | |
////emailEntity.ScheduleTime | |
//emailEntity.ModuleType = "Monitor"; | |
//emailEntity.BodyContent = "<title>" + emailContent + "</title>"; | |
//emailEntity.Charset = "gb2312"; | |
//emailEntity.ContentType = "text/html"; | |
//emailEntity.DeadlineTime = DateTime.Now.AddYears(1); | |
//emailEntity.Importance = "0"; | |
//emailEntity.SenderName = "携程旅行网"; | |
//emailEntity.Sender = "monitor@ctrip.com"; | |
//emailEntity.SourceID = 0; | |
//emailEntity.Uid = ""; | |
//emailEntity.BusinessType = "Monitor"; | |
//emailEntity.BodyTemplateID = 29; | |
//emailEntity.Orderid = ""; | |
//emailEntity.Subject = "异步更新数据库异常"; | |
//SendEmailUtility seu = new SendEmailUtility(); | |
//seu.SendLogEmail(emailEntity); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
namespace Ctrip.AM.IPS.Frame.CommLib | |
{ | |
/// <summary> | |
/// 异步update实体 | |
/// </summary> | |
public class UpdateAsynEntity | |
{ | |
private object _updateEntity; | |
/// <summary> | |
/// 构造函数 | |
/// </summary> | |
/// <param name="entity">update实体</param> | |
public UpdateAsynEntity(Object updateEntity) | |
{ | |
_updateEntity = updateEntity; | |
} | |
/// <summary> | |
/// update实体 | |
/// </summary> | |
public object UpdateEntity | |
{ | |
get { return _updateEntity; } | |
} | |
/// <summary> | |
/// update处理逻辑 | |
/// </summary> | |
internal IUpdateAsynProcess _updateAsynProcess; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Text; | |
namespace Ctrip.AM.IPS.Frame.BaseLib.Interface | |
{ | |
public interface IUpdateAsynProcess | |
{ | |
/// <summary> | |
/// udpate的操作函数 | |
/// </summary> | |
/// <param name="updateEntity"></param> | |
void Write(UpdateAsynEntity updateEntity); | |
/// <summary> | |
/// 如果update失败,在替代操作时取得内容 | |
/// </summary> | |
/// <param name="updateEntity"></param> | |
/// <returns></returns> | |
string GetLog(UpdateAsynEntity updateEntity); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment