Created
November 16, 2017 21:29
-
-
Save hyrmn/7f9032afadf5445b33aa66b263c1c47c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using Anotar.Serilog; | |
using Microsoft.ServiceBus; | |
using Microsoft.ServiceBus.Messaging; | |
using RezStream.Infrastructure.Azure; | |
using RezStream.Infrastructure.Extensions; | |
namespace RezCloud.Infrastructure.AzureBus | |
{ | |
public class ServiceBus | |
{ | |
private readonly string _asbSecondaryUri = TypedCloudConfigurationManager.GetSetting("azure.serviceBus.connStrSecondary"); | |
private readonly string _asbUri = TypedCloudConfigurationManager.GetSetting("azure.serviceBus.connStr"); | |
private readonly bool _hasSecondary; | |
private readonly bool _isSyphon; | |
private readonly Lazy<MessagingFactory> _messagingFactory; | |
private readonly NamingStrategy _namingStrategy = | |
new NamingStrategy(TypedCloudConfigurationManager.GetSetting("azure.serviceBus.namingStrategy")); | |
private readonly Lazy<NamespaceManager> _primaryNamespace; | |
private readonly Lazy<NamespaceManager> _secondaryNamespace; | |
private bool _pairedSuccessfully = false; | |
public ServiceBus(bool callerIsSyphon = false) | |
{ | |
_isSyphon = callerIsSyphon; | |
_hasSecondary = _asbSecondaryUri.HasValue() && _asbSecondaryUri != _asbUri; | |
_primaryNamespace = new Lazy<NamespaceManager>(() => NamespaceManager.CreateFromConnectionString(_asbUri)); | |
_messagingFactory = new Lazy<MessagingFactory>(() => MessagingFactory.CreateFromConnectionString(_asbUri)); | |
if (_hasSecondary) | |
{ | |
_secondaryNamespace = new Lazy<NamespaceManager>(() => NamespaceManager.CreateFromConnectionString(_asbSecondaryUri)); | |
} | |
} | |
public void Initialize() | |
{ | |
if (PairingIsRequired()) | |
{ | |
PairNamespace(); | |
} | |
} | |
public SubscriptionClient CreateSubscriptionClient(Topics topic, string subscriptionName, | |
ReceiveMode receiveMode = ReceiveMode.PeekLock) | |
{ | |
var topicDescription = GetOrCreateTopic(topic); | |
if (!_primaryNamespace.Value.SubscriptionExists(topicDescription.Path, subscriptionName)) | |
{ | |
_primaryNamespace.Value.CreateSubscription(topicDescription.Path, subscriptionName); | |
} | |
return _messagingFactory.Value.CreateSubscriptionClient(topicDescription.Path, subscriptionName, receiveMode); | |
} | |
public TopicClient CreateTopicClient(Topics topic) | |
{ | |
return _messagingFactory.Value.CreateTopicClient(GetOrCreateTopic(topic).Path); | |
} | |
public QueueClient CreateQueueClient(Queues queue, ReceiveMode receiveMode = ReceiveMode.PeekLock) | |
{ | |
var queueDescription = GetOrCreateQueue(queue); | |
return _messagingFactory.Value.CreateQueueClient(queueDescription.Path, receiveMode); | |
} | |
private QueueDescription GetOrCreateQueue(Queues queue) | |
{ | |
if (PairingIsRequired()) | |
{ | |
PairNamespace(); | |
} | |
var partitionedName = _namingStrategy.GetPartitionedName(queue); | |
if (_primaryNamespace.Value.QueueExists(partitionedName)) | |
{ | |
var queueDescription = _primaryNamespace.Value.GetQueue(partitionedName); | |
return queueDescription; | |
} | |
var newQueue = new QueueDescription(partitionedName) {EnablePartitioning = true}; | |
return _primaryNamespace.Value.CreateQueue(newQueue); | |
} | |
private TopicDescription GetOrCreateTopic(Topics topic) | |
{ | |
if (PairingIsRequired()) | |
{ | |
PairNamespace(); | |
} | |
var partitionedName = _namingStrategy.GetPartitionedName(topic); | |
if (_primaryNamespace.Value.TopicExists(partitionedName)) | |
{ | |
var topicDescription = _primaryNamespace.Value.GetTopic(partitionedName); | |
return topicDescription; | |
} | |
var newTopic = new TopicDescription(partitionedName) {EnablePartitioning = true}; | |
return _primaryNamespace.Value.CreateTopic(newTopic); | |
} | |
private bool PairingIsRequired() | |
{ | |
return _hasSecondary && !_pairedSuccessfully; | |
} | |
private void PairNamespace() | |
{ | |
//TODO: If this fails, we fail to send the message and it is basically lost. We should be able to replay it from the event store, but that sucks. | |
// 1) This pairing process takes 60 seconds to timeout in prod, which can make for a very long request. | |
// 2) As noted, if it fails, it just fails for this message. It will try and connect again on the next request. | |
// 3) In theory, it seems like it would be nice if we could do this at App startup, and if it fails, force the app to restart - but we need to know the app is just starting up/deploying | |
// so that way, the deploy would eiter succeed after a connection is made OR fail because a connection could not be made. However, | |
try | |
{ | |
var secondaryFactory = MessagingFactory.CreateFromConnectionString(_asbSecondaryUri); | |
var pairingOptions = new SendAvailabilityPairedNamespaceOptions( | |
secondaryNamespaceManager: _secondaryNamespace.Value, | |
messagingFactory: secondaryFactory, | |
backlogQueueCount: 10, | |
failoverInterval: TimeSpan.Zero, | |
enableSyphon: _isSyphon); | |
_messagingFactory.Value.PairNamespaceAsync(pairingOptions).Wait(); | |
if (pairingOptions.BacklogQueueCount < 1) | |
{ | |
LogTo.Fatal("Pairing service bus namespaces did not create any backlog queues! We're running without suspenders here!"); | |
} | |
else | |
{ | |
LogTo.Information("Successfully paired service bus namespaces. Backlog Queue Count: {BacklogQueueCount}", | |
pairingOptions.BacklogQueueCount); | |
} | |
_pairedSuccessfully = true; | |
} | |
catch (AggregateException aggEx) | |
{ | |
aggEx.Handle(x => | |
{ | |
if (x is InvalidOperationException) | |
{ | |
//PairNamespace can only be called once per MessagingFactory. | |
// - we see this in prod: seems to happen when 2 operations trigger this call simultansously. one wins, the other throws this ex and is logically ignored. | |
LogTo.Warning(x, "Invalid Operation while trying to pair namespaces. Namespaces already paired."); | |
return true; | |
} | |
LogTo.Fatal(x, "Error while trying to pair namespaces"); | |
return false; | |
}); | |
} | |
catch (Exception ex) | |
{ | |
LogTo.Fatal(ex, "Error while trying to pair namespaces"); | |
throw; | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using Anotar.Serilog; | |
using Microsoft.ServiceBus.Messaging; | |
using RezStream.Infrastructure; | |
namespace RezCloud.Infrastructure.AzureBus | |
{ | |
public class Queues : Enumeration<Queues, string> | |
{ | |
public static Queues Email = new Queues("Email", "Email"); | |
public static Queues Ota = new Queues("Ota", "Ota"); | |
public static Queues OtaLogging = new Queues("OtaLogging", "OtaLogging"); | |
private Queues(string value, string displayName) | |
: base(value, displayName) | |
{ | |
} | |
} | |
public class Topics : Enumeration<Topics, string> | |
{ | |
public static Topics Reporting = new Topics("Reporting", "Reporting"); | |
private Topics(string value, string displayName) | |
: base(value, displayName) | |
{ | |
} | |
} | |
public class ServiceBusDispatcher | |
{ | |
private readonly MessageTooBigPersistance _messageTooBigPersistance; | |
private readonly ConcurrentDictionary<string, QueueClient> _queues = new ConcurrentDictionary<string, QueueClient>(); | |
private readonly ServiceBus _serviceBus; | |
private readonly ConcurrentDictionary<string, TopicClient> _topics = new ConcurrentDictionary<string, TopicClient>(); | |
public ServiceBusDispatcher(MessageTooBigPersistance messageTooBigPersistance) | |
{ | |
_serviceBus = new ServiceBus(callerIsSyphon: false); | |
_messageTooBigPersistance = messageTooBigPersistance; | |
} | |
public void Initialize() | |
{ | |
_serviceBus.Initialize(); | |
} | |
public void Send<T>(Message<T> message, Queues queue) | |
{ | |
LogTo.Debug("Sending message {0} to queue {1}", message.StreamId, queue.Value); | |
QueueClient queueClient = null; | |
try | |
{ | |
//NOTE: The process of connecting/creating queues can take a very long time (5+ minutes locally) if there is no internet available. | |
// So although this try catch allows the app to work, anything that calls this take forever to complete. | |
// This is a pretty corner case, but something to be aware of. | |
queueClient = _queues.GetOrAdd(queue.Value, key => _serviceBus.CreateQueueClient(queue)); | |
} | |
catch (Exception ex) | |
{ | |
message.Log() | |
.Fatal(ex, | |
"Unable to connect to service bus queue {ServiceBusQueue}. Unable to post {Message}.", | |
queue, | |
typeof(T)); | |
return; | |
} | |
ServiceBusRetryStrategy.RetryPolicyAsync | |
.ExecuteAsync(() => queueClient.SendAsync(CreateBrokeredMessage(message))) | |
.ContinueWith(task => | |
{ | |
if (task.Exception != null) | |
{ | |
HandleFaultedSend(message, | |
task.Exception, | |
queue, | |
queueClient.Path, | |
queueClient.MessagingFactory.Address); | |
} | |
}); | |
} | |
public void Publish<T>(Message<T> message, Topics topic) | |
{ | |
LogTo.Debug("Publishing message {0} to topic {1}", message.StreamId, topic.Value); | |
TopicClient topicClient = null; | |
try | |
{ | |
//NOTE: The process of connecting/creating topics can take a very long time (5+ minutes locally) if there is no internet available. | |
// So although this try catch allows the app to work, anything that calls this take forever to complete. | |
// This is a pretty corner case, but something to be aware of. | |
topicClient = _topics.GetOrAdd(topic.Value, key => _serviceBus.CreateTopicClient(topic)); | |
} | |
catch (Exception ex) | |
{ | |
message.Log() | |
.Fatal(ex, | |
"Unable to connect to service bus topic {ServiceBusTopic}. Unable to post {Message}.", | |
topic, | |
typeof(T)); | |
return; | |
} | |
ServiceBusRetryStrategy.RetryPolicyAsync | |
.ExecuteAsync(() => topicClient.SendAsync(CreateBrokeredMessage(message))) | |
.ContinueWith(task => | |
{ | |
if (task.Exception != null) | |
{ | |
HandleFaultedPublish(message, | |
task.Exception, | |
topic, | |
topicClient.Path, | |
topicClient.MessagingFactory.Address); | |
} | |
}); | |
} | |
private static BrokeredMessage CreateBrokeredMessage<T>(Message<T> message) | |
{ | |
var messageType = message.Event.GetType(); | |
var eventStream = ServiceBusSerializer.SerailizeToStream(message.Event); | |
var msg = new BrokeredMessage(eventStream, ownsStream: true) | |
{ | |
ContentType = message.Event.GetType().AssemblyQualifiedName, | |
Properties = | |
{ | |
{"MessageType", messageType.FullName}, | |
{"AssemblyName", messageType.AssemblyQualifiedName} | |
} | |
}; | |
foreach (var kv in message.Headers) | |
{ | |
msg.Properties[kv.Key] = kv.Value; | |
} | |
msg.CorrelationId = message.CommitId; | |
msg.MessageId = message.StreamId; | |
return msg; | |
} | |
private void HandleFaultedSend<T>(Message<T> message, AggregateException exception, Queues queue, string path, Uri address) | |
{ | |
var msgTooBigMsg = HandledFaultedMessage(message, exception, path, address); | |
if (msgTooBigMsg != null) | |
{ | |
Send(msgTooBigMsg, queue); | |
} | |
} | |
private void HandleFaultedPublish<T>(Message<T> message, AggregateException exception, Topics topic, string path, Uri address) | |
{ | |
var msgTooBigMsg = HandledFaultedMessage(message, exception, path, address); | |
if (msgTooBigMsg != null) | |
{ | |
Publish(msgTooBigMsg, topic); | |
} | |
} | |
private Message<MessageTooBigEvent> HandledFaultedMessage<T>(Message<T> message, AggregateException exception, string path, | |
Uri address) | |
{ | |
if (typeof(T).IsAssignableFrom(typeof(MessageTooBigEvent))) | |
{ | |
return null; | |
} | |
if (exception.InnerException is MessageSizeExceededException) | |
{ | |
try | |
{ | |
var msgTooBigEvt = _messageTooBigPersistance.Save(message); | |
return new Message<MessageTooBigEvent>(msgTooBigEvt, message.Headers); | |
} | |
catch (Exception ex) | |
{ | |
message.Log().Fatal(ex, "Unable to persist large message to fallback storage"); | |
} | |
} | |
else | |
{ | |
message.Log() | |
.Fatal(exception.Flatten(), | |
"Unable to post {Message} to path {Path} on {ServiceBusAddress}", | |
typeof(T), | |
path, | |
address); | |
} | |
return null; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.IdentityModel.Tokens; | |
using System.Linq; | |
using System.Net; | |
using System.Net.Sockets; | |
using System.ServiceModel; | |
using System.Text.RegularExpressions; | |
using Anotar.Serilog; | |
using Microsoft.ServiceBus; | |
using Microsoft.ServiceBus.Messaging; | |
using Polly; | |
namespace RezCloud.Infrastructure.AzureBus | |
{ | |
/// <summary> | |
/// Retro-fit into Polly from: | |
/// http://topaz.codeplex.com/SourceControl/latest#source/Source/TransientFaultHandling.ServiceBus/ServiceBusTransientErrorDetectionStrategy.cs | |
/// Referenced here: https://msdn.microsoft.com/en-us/library/dn440719(v=pandp.60).aspx | |
/// </summary> | |
public class ServiceBusRetryStrategy | |
{ | |
private static readonly Regex AcsErrorCodeRegex = new Regex(@"Error:Code:(\d+):SubCode:(\w\d+)", | |
RegexOptions.IgnoreCase | RegexOptions.Compiled); | |
private static readonly int[] HttpStatusCodes = | |
{ | |
(int) HttpStatusCode.InternalServerError, | |
(int) HttpStatusCode.GatewayTimeout, | |
(int) HttpStatusCode.ServiceUnavailable, | |
(int) HttpStatusCode.RequestTimeout | |
}; | |
private static readonly WebExceptionStatus[] WebExceptionStatus = | |
{ | |
System.Net.WebExceptionStatus.ConnectionClosed, | |
System.Net.WebExceptionStatus.Timeout, | |
System.Net.WebExceptionStatus.RequestCanceled | |
}; | |
public static Policy RetryPolicy = Policy | |
.Handle<Exception>(ex => | |
{ | |
var isTransientError = ex != null && (CheckIsTransient(ex) || | |
ex.InnerException != null && | |
CheckIsTransient(ex.InnerException)); | |
if (isTransientError) | |
{ | |
LogTo.Warning(ex, "Transient error encountered while sending to the service bus"); | |
} | |
return isTransientError; | |
}) | |
.WaitAndRetry(new[] | |
{ | |
TimeSpan.FromSeconds(1), | |
TimeSpan.FromSeconds(2), | |
TimeSpan.FromSeconds(3) | |
}); | |
public static Policy RetryPolicyAsync = Policy | |
.Handle<Exception>(ex => | |
{ | |
var isTransientError = ex != null && (CheckIsTransient(ex) || | |
ex.InnerException != null && | |
CheckIsTransient(ex.InnerException)); | |
if (isTransientError) | |
{ | |
LogTo.Warning(ex, "Transient error encountered while sending to the service bus"); | |
} | |
return isTransientError; | |
}) | |
.WaitAndRetryAsync(new[] | |
{ | |
TimeSpan.FromSeconds(1), | |
TimeSpan.FromSeconds(2), | |
TimeSpan.FromSeconds(3) | |
}); | |
private static bool CheckIsTransient(Exception ex) | |
{ | |
if (ex == null) return false; | |
var messagingException = ex as MessagingException; | |
if (messagingException != null) | |
{ | |
// The IsTransient property already covers the following scenarios: | |
//if (ex is MessageLockLostException) return false; | |
//if (ex is MessagingEntityAlreadyExistsException) return false; | |
//if (ex is MessagingEntityNotFoundException) return false; | |
//if (ex is MessagingCommunicationException) return true; | |
//if (ex is ServerBusyException) return true; | |
return messagingException.IsTransient; | |
} | |
if (ex is FaultException) return false; | |
if (ex is CommunicationObjectFaultedException) return false; | |
if (ex is TimeoutException) return true; | |
var webException = ex as WebException; | |
if (webException != null) | |
{ | |
if (WebExceptionStatus.Contains(webException.Status)) return true; | |
if (webException.Status == System.Net.WebExceptionStatus.ProtocolError) | |
{ | |
var response = webException.Response as HttpWebResponse; | |
if (response != null && HttpStatusCodes.Contains((int) response.StatusCode)) return true; | |
} | |
} | |
if (ex is SecurityTokenException) return true; | |
if (ex is ServerTooBusyException) return true; | |
if (ex is ServerErrorException) return true; | |
if (ex is ProtocolException) return true; | |
// This exception may occur when a listener and a consumer are being | |
// initialized out of sync (e.g. consumer is reaching to a listener that | |
// is still in the process of setting up the Service Host). | |
if (ex is EndpointNotFoundException) return true; | |
if (ex is CommunicationException) return true; | |
var socketFault = ex as SocketException; | |
if (socketFault != null) | |
{ | |
return socketFault.SocketErrorCode == SocketError.TimedOut; | |
} | |
if (ex is UnauthorizedAccessException) | |
{ | |
// Need to provide some resilience against the following fault that was seen a few times: | |
// System.UnauthorizedAccessException: The token provider was unable to provide a security token while accessing 'https://xxx.accesscontrol.windows.net/WRAPv0.9/'. | |
// Token provider returned message: 'Error:Code:500:SubCode:T9002:Detail:An internal network error occured. Please try again.'. | |
// System.IdentityModel.Tokens.SecurityTokenException: The token provider was unable to provide a security token while accessing 'https://xxx.accesscontrol.windows.net/WRAPv0.9/'. | |
// Token provider returned message: 'Error:Code:500:SubCode:T9002:Detail:An internal network error occured. Please try again.'. | |
// System.Net.WebException: The remote server returned an error: (500) Internal Server Error. | |
var match = AcsErrorCodeRegex.Match(ex.Message); | |
var errorCode = 0; | |
if (match.Success && match.Groups.Count > 1 && int.TryParse(match.Groups[1].Value, out errorCode)) | |
{ | |
return HttpStatusCodes.Contains(errorCode); | |
} | |
} | |
return false; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment