Skip to content

Instantly share code, notes, and snippets.

@ramonsmits
Created December 16, 2019 21:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ramonsmits/91923a8f7cad3435179ff375af46b0e2 to your computer and use it in GitHub Desktop.
Save ramonsmits/91923a8f7cad3435179ff375af46b0e2 to your computer and use it in GitHub Desktop.
Retrieves MSMQ endpoint instance data stored in a SQL Server instead from the file system.
CREATE TABLE [dbo].[EndpointInstance]
(
[EndpointName] VARCHAR(128) NOT NULL,
[Discriminator] VARCHAR(128) NULL DEFAULT NULL,
[Machine] VARCHAR(256) NOT NULL
)
using System;
using System.Collections.Generic;
using System.Data.SqlClient;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using NServiceBus;
using NServiceBus.Features;
using NServiceBus.Logging;
using NServiceBus.Routing;
class SqlEndpointInstanceSource : Feature
{
static readonly ILog Log = LogManager.GetLogger(typeof(SqlEndpointInstanceSource));
static readonly string ConnectionString = "Server=.;Database=MySqlCatalog;Trusted_Connection=True;";
static readonly TimeSpan FetchInterval = TimeSpan.FromSeconds(10);
static readonly TimeSpan MaxFailureWindow = TimeSpan.FromMinutes(5);
public SqlEndpointInstanceSource()
{
EnableByDefault();
}
protected override void Setup(FeatureConfigurationContext context)
{
var endpointInstances = context.Settings.Get<EndpointInstances>();
context.RegisterStartupTask(b => new MonitorTask(endpointInstances, b.Build<CriticalError>()));
}
class MonitorTask : FeatureStartupTask
{
readonly EndpointInstances endpointInstances;
readonly CriticalError CriticalError;
Timer timer;
int failures;
Stopwatch firstFailureAt;
public MonitorTask(EndpointInstances endpointInstances, CriticalError criticalError)
{
this.endpointInstances = endpointInstances;
CriticalError = criticalError;
timer = new Timer(s => UpdateInstances(), this, Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
}
protected override async Task OnStart(IMessageSession session)
{
await ReloadData().ConfigureAwait(false); // First time MUST work to prime routing table from SQL source.
timer.Change(FetchInterval, Timeout.InfiniteTimeSpan);
}
protected override Task OnStop(IMessageSession session)
{
var t = timer;
timer = null;
t.Dispose();
return Task.CompletedTask;
}
async void UpdateInstances()
{
try
{
await ReloadData().ConfigureAwait(false);
timer?.Change(FetchInterval, Timeout.InfiniteTimeSpan);
if (failures > 0)
{
Log.InfoFormat("Recovered after {0} failures in {1}.", failures, firstFailureAt);
failures = 0;
}
}
catch (Exception ex)
{
if (failures == 0) firstFailureAt = Stopwatch.StartNew();
var next = TimeSpan.FromMilliseconds(100 * Math.Pow(2, failures++));
if (firstFailureAt.Elapsed > MaxFailureWindow)
{
CriticalError.Raise($"{nameof(SqlEndpointInstanceSource)} is unable to retrieve retrieve endpoint instance data.", ex);
}
Log.Warn($"Failure retrieving routing information, retrying in '{next}'.", ex);
if (next > FetchInterval)
{
next = FetchInterval;
}
timer?.Change(next, Timeout.InfiniteTimeSpan);
}
}
async Task ReloadData()
{
var start = Stopwatch.StartNew();
var newInstances = new List<EndpointInstance>();
using (var con = new SqlConnection(ConnectionString))
{
await con.OpenAsync().ConfigureAwait(false);
using (var cmd = new SqlCommand("SELECT EndpointNames,Discriminator,Machine FROM EndpointInstance", con))
{
using (var reader = await cmd.ExecuteReaderAsync().ConfigureAwait(false))
{
while (await reader.ReadAsync().ConfigureAwait(false))
{
var endpoint = (string)reader[0];
var discriminator = reader[1] == DBNull.Value ? null : (string)reader[1];
var machine = (string)reader[2];
newInstances.Add(new EndpointInstance(endpoint, discriminator).AtMachine(machine));
}
}
}
}
if (newInstances.Count == 0) throw new InvalidOperationException($"{nameof(SqlEndpointInstanceSource)} is expecting at least 1 endpoint instance item from source.");
endpointInstances.AddOrReplaceInstances(nameof(SqlEndpointInstanceSource), newInstances);
Log.DebugFormat("Routing info updated in {0} with {1} endpoint instance(s).", start.Elapsed, newInstances.Count);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment