Created
December 16, 2019 21:23
-
-
Save ramonsmits/91923a8f7cad3435179ff375af46b0e2 to your computer and use it in GitHub Desktop.
Retrieves MSMQ endpoint instance data stored in a SQL Server instead from the file system.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE TABLE [dbo].[EndpointInstance] | |
( | |
[EndpointName] VARCHAR(128) NOT NULL, | |
[Discriminator] VARCHAR(128) NULL DEFAULT NULL, | |
[Machine] VARCHAR(256) NOT NULL | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Data.SqlClient; | |
using System.Diagnostics; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using NServiceBus; | |
using NServiceBus.Features; | |
using NServiceBus.Logging; | |
using NServiceBus.Routing; | |
class SqlEndpointInstanceSource : Feature | |
{ | |
static readonly ILog Log = LogManager.GetLogger(typeof(SqlEndpointInstanceSource)); | |
static readonly string ConnectionString = "Server=.;Database=MySqlCatalog;Trusted_Connection=True;"; | |
static readonly TimeSpan FetchInterval = TimeSpan.FromSeconds(10); | |
static readonly TimeSpan MaxFailureWindow = TimeSpan.FromMinutes(5); | |
public SqlEndpointInstanceSource() | |
{ | |
EnableByDefault(); | |
} | |
protected override void Setup(FeatureConfigurationContext context) | |
{ | |
var endpointInstances = context.Settings.Get<EndpointInstances>(); | |
context.RegisterStartupTask(b => new MonitorTask(endpointInstances, b.Build<CriticalError>())); | |
} | |
class MonitorTask : FeatureStartupTask | |
{ | |
readonly EndpointInstances endpointInstances; | |
readonly CriticalError CriticalError; | |
Timer timer; | |
int failures; | |
Stopwatch firstFailureAt; | |
public MonitorTask(EndpointInstances endpointInstances, CriticalError criticalError) | |
{ | |
this.endpointInstances = endpointInstances; | |
CriticalError = criticalError; | |
timer = new Timer(s => UpdateInstances(), this, Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan); | |
} | |
protected override async Task OnStart(IMessageSession session) | |
{ | |
await ReloadData().ConfigureAwait(false); // First time MUST work to prime routing table from SQL source. | |
timer.Change(FetchInterval, Timeout.InfiniteTimeSpan); | |
} | |
protected override Task OnStop(IMessageSession session) | |
{ | |
var t = timer; | |
timer = null; | |
t.Dispose(); | |
return Task.CompletedTask; | |
} | |
async void UpdateInstances() | |
{ | |
try | |
{ | |
await ReloadData().ConfigureAwait(false); | |
timer?.Change(FetchInterval, Timeout.InfiniteTimeSpan); | |
if (failures > 0) | |
{ | |
Log.InfoFormat("Recovered after {0} failures in {1}.", failures, firstFailureAt); | |
failures = 0; | |
} | |
} | |
catch (Exception ex) | |
{ | |
if (failures == 0) firstFailureAt = Stopwatch.StartNew(); | |
var next = TimeSpan.FromMilliseconds(100 * Math.Pow(2, failures++)); | |
if (firstFailureAt.Elapsed > MaxFailureWindow) | |
{ | |
CriticalError.Raise($"{nameof(SqlEndpointInstanceSource)} is unable to retrieve retrieve endpoint instance data.", ex); | |
} | |
Log.Warn($"Failure retrieving routing information, retrying in '{next}'.", ex); | |
if (next > FetchInterval) | |
{ | |
next = FetchInterval; | |
} | |
timer?.Change(next, Timeout.InfiniteTimeSpan); | |
} | |
} | |
async Task ReloadData() | |
{ | |
var start = Stopwatch.StartNew(); | |
var newInstances = new List<EndpointInstance>(); | |
using (var con = new SqlConnection(ConnectionString)) | |
{ | |
await con.OpenAsync().ConfigureAwait(false); | |
using (var cmd = new SqlCommand("SELECT EndpointNames,Discriminator,Machine FROM EndpointInstance", con)) | |
{ | |
using (var reader = await cmd.ExecuteReaderAsync().ConfigureAwait(false)) | |
{ | |
while (await reader.ReadAsync().ConfigureAwait(false)) | |
{ | |
var endpoint = (string)reader[0]; | |
var discriminator = reader[1] == DBNull.Value ? null : (string)reader[1]; | |
var machine = (string)reader[2]; | |
newInstances.Add(new EndpointInstance(endpoint, discriminator).AtMachine(machine)); | |
} | |
} | |
} | |
} | |
if (newInstances.Count == 0) throw new InvalidOperationException($"{nameof(SqlEndpointInstanceSource)} is expecting at least 1 endpoint instance item from source."); | |
endpointInstances.AddOrReplaceInstances(nameof(SqlEndpointInstanceSource), newInstances); | |
Log.DebugFormat("Routing info updated in {0} with {1} endpoint instance(s).", start.Elapsed, newInstances.Count); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment