Skip to content

Instantly share code, notes, and snippets.

@hyrmn
Created June 15, 2016 20:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hyrmn/12a914f29643d4f020275a1ef2d95878 to your computer and use it in GitHub Desktop.
Save hyrmn/12a914f29643d4f020275a1ef2d95878 to your computer and use it in GitHub Desktop.
public class ServiceBus
{
private readonly string _asbSecondaryUri = TypedCloudConfigurationManager.GetSetting("azure.serviceBus.connStrSecondary");
private readonly string _asbUri = TypedCloudConfigurationManager.GetSetting("azure.serviceBus.connStr");
private readonly bool _hasSecondary;
private readonly bool _isSyphon;
private readonly Lazy<MessagingFactory> _messagingFactory;
private readonly NamingStrategy _namingStrategy =
new NamingStrategy(TypedCloudConfigurationManager.GetSetting("azure.serviceBus.namingStrategy"));
private readonly Lazy<NamespaceManager> _primaryNamespace;
private readonly Lazy<NamespaceManager> _secondaryNamespace;
public ServiceBus(Lazy<MessagingFactory> messagingFactory, bool callerIsSyphon = false)
{
_isSyphon = callerIsSyphon;
_hasSecondary = _asbSecondaryUri.HasValue() && _asbSecondaryUri != _asbUri;
_messagingFactory = messagingFactory;
_primaryNamespace = new Lazy<NamespaceManager>(() => NamespaceManager.CreateFromConnectionString(_asbUri));
if (_hasSecondary)
{
_secondaryNamespace = new Lazy<NamespaceManager>(() => NamespaceManager.CreateFromConnectionString(_asbSecondaryUri));
}
}
public QueueDescription GetOrCreateQueue(Queues queue)
{
if (_hasSecondary && !_secondaryNamespace.IsValueCreated)
{
PairNamespace();
}
var partitionedName = _namingStrategy.GetPartitionedName(queue);
if (_primaryNamespace.Value.QueueExists(partitionedName))
{
var queueDescription = _primaryNamespace.Value.GetQueue(partitionedName);
return queueDescription;
}
var newQueue = new QueueDescription(partitionedName) {EnablePartitioning = true};
return _primaryNamespace.Value.CreateQueue(newQueue);
}
public TopicDescription GetOrCreateTopic(Topics topic)
{
if (_hasSecondary && !_secondaryNamespace.IsValueCreated)
{
PairNamespace();
}
var partitionedName = _namingStrategy.GetPartitionedName(topic);
if (_primaryNamespace.Value.TopicExists(partitionedName))
{
var topicDescription = _primaryNamespace.Value.GetTopic(partitionedName);
return topicDescription;
}
var newTopic = new TopicDescription(partitionedName) {EnablePartitioning = true};
return _primaryNamespace.Value.CreateTopic(newTopic);
}
private void PairNamespace()
{
try
{
var secondaryFactory = MessagingFactory.CreateFromConnectionString(_asbSecondaryUri);
var pairingOptions = new SendAvailabilityPairedNamespaceOptions(
secondaryNamespaceManager: _secondaryNamespace.Value,
messagingFactory: secondaryFactory,
backlogQueueCount: 10,
failoverInterval: TimeSpan.Zero,
enableSyphon: _isSyphon);
_messagingFactory.Value.PairNamespaceAsync(pairingOptions).Wait();
if (pairingOptions.BacklogQueueCount < 1)
{
LogTo.Fatal("Pairing service bus namespaces did not create any backlog queues! We're running without suspenders here!");
}
else
{
LogTo.Information("Successfully paired service bus namespaces. Backlog Queue Count: {BacklogQueueCount}", pairingOptions.BacklogQueueCount);
}
}
catch (Exception ex)
{
LogTo.Fatal(ex, "Error while trying to pair namespaces");
}
}
}
public class ServiceBusDispatcher
{
private readonly ConcurrentDictionary<string, QueueClient> _queues = new ConcurrentDictionary<string, QueueClient>();
private readonly ConcurrentDictionary<string, TopicClient> _topics = new ConcurrentDictionary<string, TopicClient>();
private readonly Lazy<MessagingFactory> _messagingFactory;
private readonly ServiceBus _serviceBus;
private readonly string _asbUri = TypedCloudConfigurationManager.GetSetting("azure.serviceBus.connStr");
public ServiceBusDispatcher()
{
_messagingFactory = new Lazy<MessagingFactory>(() => MessagingFactory.CreateFromConnectionString(_asbUri));
_serviceBus = new ServiceBus(_messagingFactory, callerIsSyphon: false);
}
public void Send<T>(Message<T> message, Queues queue)
{
LogTo.Debug("Sending message {0} to queue {1}", message.StreamId, queue.Value);
var queueClient = _queues.GetOrAdd(queue.Value, key => _messagingFactory.Value.CreateQueueClient(_serviceBus.GetOrCreateQueue(queue).Path));
var msg = CreateBrokeredMessage(message);
ServiceBusRetryStrategy.RetryPolicyAsync
.ExecuteAsync(() => queueClient.SendAsync(msg))
.ContinueWith(task =>
{
if (task.Exception != null)
{
message.Log()
.Fatal(task.Exception.Flatten(),
"Unable to post {Message} to queue {Topic} on {ServiceBusAddress}",
typeof(T),
queueClient.Path,
queueClient.MessagingFactory.Address);
}
});
}
public void Publish<T>(Message<T> message, Topics topic)
{
LogTo.Debug("Publishing message {0} to topic {1}", message.StreamId, topic.Value);
var topicClient = _topics.GetOrAdd(topic.Value, key => _messagingFactory.Value.CreateTopicClient(_serviceBus.GetOrCreateTopic(topic).Path));
var msg = CreateBrokeredMessage(message);
ServiceBusRetryStrategy.RetryPolicyAsync
.ExecuteAsync(() => topicClient.SendAsync(msg))
.ContinueWith(task =>
{
if (task.Exception != null)
{
message.Log()
.Fatal(task.Exception.Flatten(),
"Unable to post {Message} to topic {Topic} on {ServiceBusAddress}",
typeof(T),
topicClient.Path,
topicClient.MessagingFactory.Address);
}
});
}
private static BrokeredMessage CreateBrokeredMessage<T>(Message<T> message)
{
var messageType = message.Event.GetType();
var eventStream = ServiceBusSerializer.SerailizeToStream(message.Event);
var msg = new BrokeredMessage(eventStream, ownsStream: true)
{
ContentType = message.Event.GetType().AssemblyQualifiedName,
Properties =
{
{"MessageType", messageType.FullName},
{"AssemblyName", messageType.AssemblyQualifiedName}
}
};
foreach (var kv in message.Headers)
{
msg.Properties[kv.Key] = kv.Value;
}
msg.CorrelationId = message.CommitId;
msg.MessageId = message.StreamId;
return msg;
}
}
private void SubscribeToSomeTopic(Lazy<NamespaceManager> namespaceManager, Lazy<MessagingFactory> messagingFactory)
{
var topicDescription = _serviceBus.GetOrCreateTopic(Topics.Reporting);
if (!namespaceManager.Value.SubscriptionExists(topicDescription.Path, "RezCloud.OtaWorker"))
{
namespaceManager.Value.CreateSubscription(topicDescription.Path, "RezCloud.OtaWorker");
}
_otaReportingTopicClient = messagingFactory.Value.CreateSubscriptionClient(topicDescription.Path, "RezCloud.OtaWorker", ReceiveMode.PeekLock);
}
private void HookupQueueClient(Lazy<MessagingFactory> messagingFactory)
{
var queueDescription = _serviceBus.GetOrCreateQueue(Queues.Ota);
_otaQueueClient = messagingFactory.Value.CreateQueueClient(queueDescription.Path, ReceiveMode.PeekLock);
}
public void Run()
{
var messageOptions = new OnMessageOptions {MaxConcurrentCalls = 1, AutoComplete = true};
messageOptions.ExceptionReceived += OnException;
_otaReportingTopicClient.OnMessageAsync(async (receivedMessage) => await _otaMessageDispatcher.Process(receivedMessage), messageOptions);
_otaQueueClient.OnMessageAsync(async (receivedMessage) => await _otaMessageDispatcher.Process(receivedMessage), messageOptions);
}
public void Stop()
{
CloseClient(_otaReportingTopicClient);
CloseClient(_otaQueueClient);
}
private void CloseClient(MessageClientEntity client)
{
if (client != null && !client.IsClosed)
{
client.Close();
}
}
private void OnException(object sender, ExceptionReceivedEventArgs e)
{
if (e.Exception == null)
{
return;
}
if (e.Exception is AlreadyLoggedException)
{
return;
}
LogTo.Error(e.Exception, "Error processing message. Action: {action}, Sender: {@sender}", e.Action, sender);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment