Created
July 10, 2017 07:34
-
-
Save fabercs/e7d1a31ad292ba5ce66e158ab5fd7d3c to your computer and use it in GitHub Desktop.
Queue Processor (TPL One Producer/Multiple Consumers)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Configuration; | |
using System.Data; | |
using System.Data.SqlClient; | |
using System.Diagnostics; | |
using System.Threading.Tasks; | |
using System.Threading.Tasks.Dataflow; | |
namespace ServiceBrokerQueueProcessorTPL | |
{ | |
public static class QueueProcessor | |
{ | |
public static ActionBlock<QueueMessage> notifQueue { get; set; } | |
private static ExecutionDataflowBlockOptions processBlockOptions | |
{ | |
get | |
{ | |
return new ExecutionDataflowBlockOptions | |
{ | |
MaxDegreeOfParallelism = 4, | |
BoundedCapacity = 8 | |
}; | |
} | |
} | |
public static bool Stopped { get; private set; } | |
static WsLogger Logger { get; set; } | |
static QueueProcessor() | |
{ | |
notifQueue = new ActionBlock<QueueMessage>((item) => Process(item), | |
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }); | |
Stopped = false; | |
Logger = new WsLogger(); | |
} | |
public static void Start() | |
{ | |
try | |
{ | |
while (!Stopped) | |
{ | |
Produce(); | |
} | |
} | |
catch (Exception ex) | |
{ | |
throw ex; | |
} | |
} | |
private static void Produce() | |
{ | |
using (var connection = new SqlConnection(ConfigurationManager.AppSettings["SqlConnection"])) | |
{ | |
if (connection.State != System.Data.ConnectionState.Open) | |
connection.Open(); | |
DataTable table = QueueProcessorUtil.GetMessageBatch(connection); | |
if (table != null && table.Rows.Count > 0) | |
{ | |
foreach (var item in table.DataTableToList<QueueMessage>()) | |
{ | |
notifQueue.SendAsync(item); | |
} | |
} | |
} | |
} | |
private static int Process(QueueMessage item) | |
{ | |
QueueMessage message = item; | |
using (var con = new SqlConnection(ConfigurationManager.AppSettings["SqlConnection2"].ToString())) | |
{ | |
con.Open(); | |
using (var tran = con.BeginTransaction()) | |
{ | |
try | |
{ | |
Logger.WriteToLogDb(con, tran, (int)message.queuing_order, "t", 1); | |
tran.Commit(); | |
} | |
catch (Exception ex) | |
{ | |
Logger.Write("fail", ex); | |
tran.Rollback(); | |
} | |
} | |
} | |
return 0; | |
} | |
public static void Stop() | |
{ | |
Stopped = true; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment