Skip to content

Instantly share code, notes, and snippets.

@hyrmn
Created November 16, 2017 21:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hyrmn/7f9032afadf5445b33aa66b263c1c47c to your computer and use it in GitHub Desktop.
Save hyrmn/7f9032afadf5445b33aa66b263c1c47c to your computer and use it in GitHub Desktop.
using System;
using Anotar.Serilog;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using RezStream.Infrastructure.Azure;
using RezStream.Infrastructure.Extensions;
namespace RezCloud.Infrastructure.AzureBus
{
public class ServiceBus
{
private readonly string _asbSecondaryUri = TypedCloudConfigurationManager.GetSetting("azure.serviceBus.connStrSecondary");
private readonly string _asbUri = TypedCloudConfigurationManager.GetSetting("azure.serviceBus.connStr");
private readonly bool _hasSecondary;
private readonly bool _isSyphon;
private readonly Lazy<MessagingFactory> _messagingFactory;
private readonly NamingStrategy _namingStrategy =
new NamingStrategy(TypedCloudConfigurationManager.GetSetting("azure.serviceBus.namingStrategy"));
private readonly Lazy<NamespaceManager> _primaryNamespace;
private readonly Lazy<NamespaceManager> _secondaryNamespace;
private bool _pairedSuccessfully = false;
public ServiceBus(bool callerIsSyphon = false)
{
_isSyphon = callerIsSyphon;
_hasSecondary = _asbSecondaryUri.HasValue() && _asbSecondaryUri != _asbUri;
_primaryNamespace = new Lazy<NamespaceManager>(() => NamespaceManager.CreateFromConnectionString(_asbUri));
_messagingFactory = new Lazy<MessagingFactory>(() => MessagingFactory.CreateFromConnectionString(_asbUri));
if (_hasSecondary)
{
_secondaryNamespace = new Lazy<NamespaceManager>(() => NamespaceManager.CreateFromConnectionString(_asbSecondaryUri));
}
}
public void Initialize()
{
if (PairingIsRequired())
{
PairNamespace();
}
}
public SubscriptionClient CreateSubscriptionClient(Topics topic, string subscriptionName,
ReceiveMode receiveMode = ReceiveMode.PeekLock)
{
var topicDescription = GetOrCreateTopic(topic);
if (!_primaryNamespace.Value.SubscriptionExists(topicDescription.Path, subscriptionName))
{
_primaryNamespace.Value.CreateSubscription(topicDescription.Path, subscriptionName);
}
return _messagingFactory.Value.CreateSubscriptionClient(topicDescription.Path, subscriptionName, receiveMode);
}
public TopicClient CreateTopicClient(Topics topic)
{
return _messagingFactory.Value.CreateTopicClient(GetOrCreateTopic(topic).Path);
}
public QueueClient CreateQueueClient(Queues queue, ReceiveMode receiveMode = ReceiveMode.PeekLock)
{
var queueDescription = GetOrCreateQueue(queue);
return _messagingFactory.Value.CreateQueueClient(queueDescription.Path, receiveMode);
}
private QueueDescription GetOrCreateQueue(Queues queue)
{
if (PairingIsRequired())
{
PairNamespace();
}
var partitionedName = _namingStrategy.GetPartitionedName(queue);
if (_primaryNamespace.Value.QueueExists(partitionedName))
{
var queueDescription = _primaryNamespace.Value.GetQueue(partitionedName);
return queueDescription;
}
var newQueue = new QueueDescription(partitionedName) {EnablePartitioning = true};
return _primaryNamespace.Value.CreateQueue(newQueue);
}
private TopicDescription GetOrCreateTopic(Topics topic)
{
if (PairingIsRequired())
{
PairNamespace();
}
var partitionedName = _namingStrategy.GetPartitionedName(topic);
if (_primaryNamespace.Value.TopicExists(partitionedName))
{
var topicDescription = _primaryNamespace.Value.GetTopic(partitionedName);
return topicDescription;
}
var newTopic = new TopicDescription(partitionedName) {EnablePartitioning = true};
return _primaryNamespace.Value.CreateTopic(newTopic);
}
private bool PairingIsRequired()
{
return _hasSecondary && !_pairedSuccessfully;
}
private void PairNamespace()
{
//TODO: If this fails, we fail to send the message and it is basically lost. We should be able to replay it from the event store, but that sucks.
// 1) This pairing process takes 60 seconds to timeout in prod, which can make for a very long request.
// 2) As noted, if it fails, it just fails for this message. It will try and connect again on the next request.
// 3) In theory, it seems like it would be nice if we could do this at App startup, and if it fails, force the app to restart - but we need to know the app is just starting up/deploying
// so that way, the deploy would eiter succeed after a connection is made OR fail because a connection could not be made. However,
try
{
var secondaryFactory = MessagingFactory.CreateFromConnectionString(_asbSecondaryUri);
var pairingOptions = new SendAvailabilityPairedNamespaceOptions(
secondaryNamespaceManager: _secondaryNamespace.Value,
messagingFactory: secondaryFactory,
backlogQueueCount: 10,
failoverInterval: TimeSpan.Zero,
enableSyphon: _isSyphon);
_messagingFactory.Value.PairNamespaceAsync(pairingOptions).Wait();
if (pairingOptions.BacklogQueueCount < 1)
{
LogTo.Fatal("Pairing service bus namespaces did not create any backlog queues! We're running without suspenders here!");
}
else
{
LogTo.Information("Successfully paired service bus namespaces. Backlog Queue Count: {BacklogQueueCount}",
pairingOptions.BacklogQueueCount);
}
_pairedSuccessfully = true;
}
catch (AggregateException aggEx)
{
aggEx.Handle(x =>
{
if (x is InvalidOperationException)
{
//PairNamespace can only be called once per MessagingFactory.
// - we see this in prod: seems to happen when 2 operations trigger this call simultansously. one wins, the other throws this ex and is logically ignored.
LogTo.Warning(x, "Invalid Operation while trying to pair namespaces. Namespaces already paired.");
return true;
}
LogTo.Fatal(x, "Error while trying to pair namespaces");
return false;
});
}
catch (Exception ex)
{
LogTo.Fatal(ex, "Error while trying to pair namespaces");
throw;
}
}
}
}
using System;
using System.Collections.Concurrent;
using Anotar.Serilog;
using Microsoft.ServiceBus.Messaging;
using RezStream.Infrastructure;
namespace RezCloud.Infrastructure.AzureBus
{
public class Queues : Enumeration<Queues, string>
{
public static Queues Email = new Queues("Email", "Email");
public static Queues Ota = new Queues("Ota", "Ota");
public static Queues OtaLogging = new Queues("OtaLogging", "OtaLogging");
private Queues(string value, string displayName)
: base(value, displayName)
{
}
}
public class Topics : Enumeration<Topics, string>
{
public static Topics Reporting = new Topics("Reporting", "Reporting");
private Topics(string value, string displayName)
: base(value, displayName)
{
}
}
public class ServiceBusDispatcher
{
private readonly MessageTooBigPersistance _messageTooBigPersistance;
private readonly ConcurrentDictionary<string, QueueClient> _queues = new ConcurrentDictionary<string, QueueClient>();
private readonly ServiceBus _serviceBus;
private readonly ConcurrentDictionary<string, TopicClient> _topics = new ConcurrentDictionary<string, TopicClient>();
public ServiceBusDispatcher(MessageTooBigPersistance messageTooBigPersistance)
{
_serviceBus = new ServiceBus(callerIsSyphon: false);
_messageTooBigPersistance = messageTooBigPersistance;
}
public void Initialize()
{
_serviceBus.Initialize();
}
public void Send<T>(Message<T> message, Queues queue)
{
LogTo.Debug("Sending message {0} to queue {1}", message.StreamId, queue.Value);
QueueClient queueClient = null;
try
{
//NOTE: The process of connecting/creating queues can take a very long time (5+ minutes locally) if there is no internet available.
// So although this try catch allows the app to work, anything that calls this take forever to complete.
// This is a pretty corner case, but something to be aware of.
queueClient = _queues.GetOrAdd(queue.Value, key => _serviceBus.CreateQueueClient(queue));
}
catch (Exception ex)
{
message.Log()
.Fatal(ex,
"Unable to connect to service bus queue {ServiceBusQueue}. Unable to post {Message}.",
queue,
typeof(T));
return;
}
ServiceBusRetryStrategy.RetryPolicyAsync
.ExecuteAsync(() => queueClient.SendAsync(CreateBrokeredMessage(message)))
.ContinueWith(task =>
{
if (task.Exception != null)
{
HandleFaultedSend(message,
task.Exception,
queue,
queueClient.Path,
queueClient.MessagingFactory.Address);
}
});
}
public void Publish<T>(Message<T> message, Topics topic)
{
LogTo.Debug("Publishing message {0} to topic {1}", message.StreamId, topic.Value);
TopicClient topicClient = null;
try
{
//NOTE: The process of connecting/creating topics can take a very long time (5+ minutes locally) if there is no internet available.
// So although this try catch allows the app to work, anything that calls this take forever to complete.
// This is a pretty corner case, but something to be aware of.
topicClient = _topics.GetOrAdd(topic.Value, key => _serviceBus.CreateTopicClient(topic));
}
catch (Exception ex)
{
message.Log()
.Fatal(ex,
"Unable to connect to service bus topic {ServiceBusTopic}. Unable to post {Message}.",
topic,
typeof(T));
return;
}
ServiceBusRetryStrategy.RetryPolicyAsync
.ExecuteAsync(() => topicClient.SendAsync(CreateBrokeredMessage(message)))
.ContinueWith(task =>
{
if (task.Exception != null)
{
HandleFaultedPublish(message,
task.Exception,
topic,
topicClient.Path,
topicClient.MessagingFactory.Address);
}
});
}
private static BrokeredMessage CreateBrokeredMessage<T>(Message<T> message)
{
var messageType = message.Event.GetType();
var eventStream = ServiceBusSerializer.SerailizeToStream(message.Event);
var msg = new BrokeredMessage(eventStream, ownsStream: true)
{
ContentType = message.Event.GetType().AssemblyQualifiedName,
Properties =
{
{"MessageType", messageType.FullName},
{"AssemblyName", messageType.AssemblyQualifiedName}
}
};
foreach (var kv in message.Headers)
{
msg.Properties[kv.Key] = kv.Value;
}
msg.CorrelationId = message.CommitId;
msg.MessageId = message.StreamId;
return msg;
}
private void HandleFaultedSend<T>(Message<T> message, AggregateException exception, Queues queue, string path, Uri address)
{
var msgTooBigMsg = HandledFaultedMessage(message, exception, path, address);
if (msgTooBigMsg != null)
{
Send(msgTooBigMsg, queue);
}
}
private void HandleFaultedPublish<T>(Message<T> message, AggregateException exception, Topics topic, string path, Uri address)
{
var msgTooBigMsg = HandledFaultedMessage(message, exception, path, address);
if (msgTooBigMsg != null)
{
Publish(msgTooBigMsg, topic);
}
}
private Message<MessageTooBigEvent> HandledFaultedMessage<T>(Message<T> message, AggregateException exception, string path,
Uri address)
{
if (typeof(T).IsAssignableFrom(typeof(MessageTooBigEvent)))
{
return null;
}
if (exception.InnerException is MessageSizeExceededException)
{
try
{
var msgTooBigEvt = _messageTooBigPersistance.Save(message);
return new Message<MessageTooBigEvent>(msgTooBigEvt, message.Headers);
}
catch (Exception ex)
{
message.Log().Fatal(ex, "Unable to persist large message to fallback storage");
}
}
else
{
message.Log()
.Fatal(exception.Flatten(),
"Unable to post {Message} to path {Path} on {ServiceBusAddress}",
typeof(T),
path,
address);
}
return null;
}
}
}
using System;
using System.IdentityModel.Tokens;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.ServiceModel;
using System.Text.RegularExpressions;
using Anotar.Serilog;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using Polly;
namespace RezCloud.Infrastructure.AzureBus
{
/// <summary>
/// Retro-fit into Polly from:
/// http://topaz.codeplex.com/SourceControl/latest#source/Source/TransientFaultHandling.ServiceBus/ServiceBusTransientErrorDetectionStrategy.cs
/// Referenced here: https://msdn.microsoft.com/en-us/library/dn440719(v=pandp.60).aspx
/// </summary>
public class ServiceBusRetryStrategy
{
private static readonly Regex AcsErrorCodeRegex = new Regex(@"Error:Code:(\d+):SubCode:(\w\d+)",
RegexOptions.IgnoreCase | RegexOptions.Compiled);
private static readonly int[] HttpStatusCodes =
{
(int) HttpStatusCode.InternalServerError,
(int) HttpStatusCode.GatewayTimeout,
(int) HttpStatusCode.ServiceUnavailable,
(int) HttpStatusCode.RequestTimeout
};
private static readonly WebExceptionStatus[] WebExceptionStatus =
{
System.Net.WebExceptionStatus.ConnectionClosed,
System.Net.WebExceptionStatus.Timeout,
System.Net.WebExceptionStatus.RequestCanceled
};
public static Policy RetryPolicy = Policy
.Handle<Exception>(ex =>
{
var isTransientError = ex != null && (CheckIsTransient(ex) ||
ex.InnerException != null &&
CheckIsTransient(ex.InnerException));
if (isTransientError)
{
LogTo.Warning(ex, "Transient error encountered while sending to the service bus");
}
return isTransientError;
})
.WaitAndRetry(new[]
{
TimeSpan.FromSeconds(1),
TimeSpan.FromSeconds(2),
TimeSpan.FromSeconds(3)
});
public static Policy RetryPolicyAsync = Policy
.Handle<Exception>(ex =>
{
var isTransientError = ex != null && (CheckIsTransient(ex) ||
ex.InnerException != null &&
CheckIsTransient(ex.InnerException));
if (isTransientError)
{
LogTo.Warning(ex, "Transient error encountered while sending to the service bus");
}
return isTransientError;
})
.WaitAndRetryAsync(new[]
{
TimeSpan.FromSeconds(1),
TimeSpan.FromSeconds(2),
TimeSpan.FromSeconds(3)
});
private static bool CheckIsTransient(Exception ex)
{
if (ex == null) return false;
var messagingException = ex as MessagingException;
if (messagingException != null)
{
// The IsTransient property already covers the following scenarios:
//if (ex is MessageLockLostException) return false;
//if (ex is MessagingEntityAlreadyExistsException) return false;
//if (ex is MessagingEntityNotFoundException) return false;
//if (ex is MessagingCommunicationException) return true;
//if (ex is ServerBusyException) return true;
return messagingException.IsTransient;
}
if (ex is FaultException) return false;
if (ex is CommunicationObjectFaultedException) return false;
if (ex is TimeoutException) return true;
var webException = ex as WebException;
if (webException != null)
{
if (WebExceptionStatus.Contains(webException.Status)) return true;
if (webException.Status == System.Net.WebExceptionStatus.ProtocolError)
{
var response = webException.Response as HttpWebResponse;
if (response != null && HttpStatusCodes.Contains((int) response.StatusCode)) return true;
}
}
if (ex is SecurityTokenException) return true;
if (ex is ServerTooBusyException) return true;
if (ex is ServerErrorException) return true;
if (ex is ProtocolException) return true;
// This exception may occur when a listener and a consumer are being
// initialized out of sync (e.g. consumer is reaching to a listener that
// is still in the process of setting up the Service Host).
if (ex is EndpointNotFoundException) return true;
if (ex is CommunicationException) return true;
var socketFault = ex as SocketException;
if (socketFault != null)
{
return socketFault.SocketErrorCode == SocketError.TimedOut;
}
if (ex is UnauthorizedAccessException)
{
// Need to provide some resilience against the following fault that was seen a few times:
// System.UnauthorizedAccessException: The token provider was unable to provide a security token while accessing 'https://xxx.accesscontrol.windows.net/WRAPv0.9/'.
// Token provider returned message: 'Error:Code:500:SubCode:T9002:Detail:An internal network error occured. Please try again.'.
// System.IdentityModel.Tokens.SecurityTokenException: The token provider was unable to provide a security token while accessing 'https://xxx.accesscontrol.windows.net/WRAPv0.9/'.
// Token provider returned message: 'Error:Code:500:SubCode:T9002:Detail:An internal network error occured. Please try again.'.
// System.Net.WebException: The remote server returned an error: (500) Internal Server Error.
var match = AcsErrorCodeRegex.Match(ex.Message);
var errorCode = 0;
if (match.Success && match.Groups.Count > 1 && int.TryParse(match.Groups[1].Value, out errorCode))
{
return HttpStatusCodes.Contains(errorCode);
}
}
return false;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment