Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dmitribodiu/96cbb989969b36083d28ead2dd68581c to your computer and use it in GitHub Desktop.
Save dmitribodiu/96cbb989969b36083d28ead2dd68581c to your computer and use it in GitHub Desktop.
Queue Processor (TPL One Producer/Multiple Consumers)
using System;
using System.Configuration;
using System.Data;
using System.Data.SqlClient;
using System.Diagnostics;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace ServiceBrokerQueueProcessorTPL
{
public static class QueueProcessor
{
public static ActionBlock<QueueMessage> notifQueue { get; set; }
private static ExecutionDataflowBlockOptions processBlockOptions
{
get
{
return new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4,
BoundedCapacity = 8
};
}
}
public static bool Stopped { get; private set; }
static WsLogger Logger { get; set; }
static QueueProcessor()
{
notifQueue = new ActionBlock<QueueMessage>((item) => Process(item),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
Stopped = false;
Logger = new WsLogger();
}
public static void Start()
{
try
{
while (!Stopped)
{
Produce();
}
}
catch (Exception ex)
{
throw ex;
}
}
private static void Produce()
{
using (var connection = new SqlConnection(ConfigurationManager.AppSettings["SqlConnection"]))
{
if (connection.State != System.Data.ConnectionState.Open)
connection.Open();
DataTable table = QueueProcessorUtil.GetMessageBatch(connection);
if (table != null && table.Rows.Count > 0)
{
foreach (var item in table.DataTableToList<QueueMessage>())
{
notifQueue.SendAsync(item);
}
}
}
}
private static int Process(QueueMessage item)
{
QueueMessage message = item;
using (var con = new SqlConnection(ConfigurationManager.AppSettings["SqlConnection2"].ToString()))
{
con.Open();
using (var tran = con.BeginTransaction())
{
try
{
Logger.WriteToLogDb(con, tran, (int)message.queuing_order, "t", 1);
tran.Commit();
}
catch (Exception ex)
{
Logger.Write("fail", ex);
tran.Rollback();
}
}
}
return 0;
}
public static void Stop()
{
Stopped = true;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment