Created
June 15, 2016 20:17
-
-
Save hyrmn/12a914f29643d4f020275a1ef2d95878 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class ServiceBus | |
{ | |
private readonly string _asbSecondaryUri = TypedCloudConfigurationManager.GetSetting("azure.serviceBus.connStrSecondary"); | |
private readonly string _asbUri = TypedCloudConfigurationManager.GetSetting("azure.serviceBus.connStr"); | |
private readonly bool _hasSecondary; | |
private readonly bool _isSyphon; | |
private readonly Lazy<MessagingFactory> _messagingFactory; | |
private readonly NamingStrategy _namingStrategy = | |
new NamingStrategy(TypedCloudConfigurationManager.GetSetting("azure.serviceBus.namingStrategy")); | |
private readonly Lazy<NamespaceManager> _primaryNamespace; | |
private readonly Lazy<NamespaceManager> _secondaryNamespace; | |
public ServiceBus(Lazy<MessagingFactory> messagingFactory, bool callerIsSyphon = false) | |
{ | |
_isSyphon = callerIsSyphon; | |
_hasSecondary = _asbSecondaryUri.HasValue() && _asbSecondaryUri != _asbUri; | |
_messagingFactory = messagingFactory; | |
_primaryNamespace = new Lazy<NamespaceManager>(() => NamespaceManager.CreateFromConnectionString(_asbUri)); | |
if (_hasSecondary) | |
{ | |
_secondaryNamespace = new Lazy<NamespaceManager>(() => NamespaceManager.CreateFromConnectionString(_asbSecondaryUri)); | |
} | |
} | |
public QueueDescription GetOrCreateQueue(Queues queue) | |
{ | |
if (_hasSecondary && !_secondaryNamespace.IsValueCreated) | |
{ | |
PairNamespace(); | |
} | |
var partitionedName = _namingStrategy.GetPartitionedName(queue); | |
if (_primaryNamespace.Value.QueueExists(partitionedName)) | |
{ | |
var queueDescription = _primaryNamespace.Value.GetQueue(partitionedName); | |
return queueDescription; | |
} | |
var newQueue = new QueueDescription(partitionedName) {EnablePartitioning = true}; | |
return _primaryNamespace.Value.CreateQueue(newQueue); | |
} | |
public TopicDescription GetOrCreateTopic(Topics topic) | |
{ | |
if (_hasSecondary && !_secondaryNamespace.IsValueCreated) | |
{ | |
PairNamespace(); | |
} | |
var partitionedName = _namingStrategy.GetPartitionedName(topic); | |
if (_primaryNamespace.Value.TopicExists(partitionedName)) | |
{ | |
var topicDescription = _primaryNamespace.Value.GetTopic(partitionedName); | |
return topicDescription; | |
} | |
var newTopic = new TopicDescription(partitionedName) {EnablePartitioning = true}; | |
return _primaryNamespace.Value.CreateTopic(newTopic); | |
} | |
private void PairNamespace() | |
{ | |
try | |
{ | |
var secondaryFactory = MessagingFactory.CreateFromConnectionString(_asbSecondaryUri); | |
var pairingOptions = new SendAvailabilityPairedNamespaceOptions( | |
secondaryNamespaceManager: _secondaryNamespace.Value, | |
messagingFactory: secondaryFactory, | |
backlogQueueCount: 10, | |
failoverInterval: TimeSpan.Zero, | |
enableSyphon: _isSyphon); | |
_messagingFactory.Value.PairNamespaceAsync(pairingOptions).Wait(); | |
if (pairingOptions.BacklogQueueCount < 1) | |
{ | |
LogTo.Fatal("Pairing service bus namespaces did not create any backlog queues! We're running without suspenders here!"); | |
} | |
else | |
{ | |
LogTo.Information("Successfully paired service bus namespaces. Backlog Queue Count: {BacklogQueueCount}", pairingOptions.BacklogQueueCount); | |
} | |
} | |
catch (Exception ex) | |
{ | |
LogTo.Fatal(ex, "Error while trying to pair namespaces"); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class ServiceBusDispatcher | |
{ | |
private readonly ConcurrentDictionary<string, QueueClient> _queues = new ConcurrentDictionary<string, QueueClient>(); | |
private readonly ConcurrentDictionary<string, TopicClient> _topics = new ConcurrentDictionary<string, TopicClient>(); | |
private readonly Lazy<MessagingFactory> _messagingFactory; | |
private readonly ServiceBus _serviceBus; | |
private readonly string _asbUri = TypedCloudConfigurationManager.GetSetting("azure.serviceBus.connStr"); | |
public ServiceBusDispatcher() | |
{ | |
_messagingFactory = new Lazy<MessagingFactory>(() => MessagingFactory.CreateFromConnectionString(_asbUri)); | |
_serviceBus = new ServiceBus(_messagingFactory, callerIsSyphon: false); | |
} | |
public void Send<T>(Message<T> message, Queues queue) | |
{ | |
LogTo.Debug("Sending message {0} to queue {1}", message.StreamId, queue.Value); | |
var queueClient = _queues.GetOrAdd(queue.Value, key => _messagingFactory.Value.CreateQueueClient(_serviceBus.GetOrCreateQueue(queue).Path)); | |
var msg = CreateBrokeredMessage(message); | |
ServiceBusRetryStrategy.RetryPolicyAsync | |
.ExecuteAsync(() => queueClient.SendAsync(msg)) | |
.ContinueWith(task => | |
{ | |
if (task.Exception != null) | |
{ | |
message.Log() | |
.Fatal(task.Exception.Flatten(), | |
"Unable to post {Message} to queue {Topic} on {ServiceBusAddress}", | |
typeof(T), | |
queueClient.Path, | |
queueClient.MessagingFactory.Address); | |
} | |
}); | |
} | |
public void Publish<T>(Message<T> message, Topics topic) | |
{ | |
LogTo.Debug("Publishing message {0} to topic {1}", message.StreamId, topic.Value); | |
var topicClient = _topics.GetOrAdd(topic.Value, key => _messagingFactory.Value.CreateTopicClient(_serviceBus.GetOrCreateTopic(topic).Path)); | |
var msg = CreateBrokeredMessage(message); | |
ServiceBusRetryStrategy.RetryPolicyAsync | |
.ExecuteAsync(() => topicClient.SendAsync(msg)) | |
.ContinueWith(task => | |
{ | |
if (task.Exception != null) | |
{ | |
message.Log() | |
.Fatal(task.Exception.Flatten(), | |
"Unable to post {Message} to topic {Topic} on {ServiceBusAddress}", | |
typeof(T), | |
topicClient.Path, | |
topicClient.MessagingFactory.Address); | |
} | |
}); | |
} | |
private static BrokeredMessage CreateBrokeredMessage<T>(Message<T> message) | |
{ | |
var messageType = message.Event.GetType(); | |
var eventStream = ServiceBusSerializer.SerailizeToStream(message.Event); | |
var msg = new BrokeredMessage(eventStream, ownsStream: true) | |
{ | |
ContentType = message.Event.GetType().AssemblyQualifiedName, | |
Properties = | |
{ | |
{"MessageType", messageType.FullName}, | |
{"AssemblyName", messageType.AssemblyQualifiedName} | |
} | |
}; | |
foreach (var kv in message.Headers) | |
{ | |
msg.Properties[kv.Key] = kv.Value; | |
} | |
msg.CorrelationId = message.CommitId; | |
msg.MessageId = message.StreamId; | |
return msg; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private void SubscribeToSomeTopic(Lazy<NamespaceManager> namespaceManager, Lazy<MessagingFactory> messagingFactory) | |
{ | |
var topicDescription = _serviceBus.GetOrCreateTopic(Topics.Reporting); | |
if (!namespaceManager.Value.SubscriptionExists(topicDescription.Path, "RezCloud.OtaWorker")) | |
{ | |
namespaceManager.Value.CreateSubscription(topicDescription.Path, "RezCloud.OtaWorker"); | |
} | |
_otaReportingTopicClient = messagingFactory.Value.CreateSubscriptionClient(topicDescription.Path, "RezCloud.OtaWorker", ReceiveMode.PeekLock); | |
} | |
private void HookupQueueClient(Lazy<MessagingFactory> messagingFactory) | |
{ | |
var queueDescription = _serviceBus.GetOrCreateQueue(Queues.Ota); | |
_otaQueueClient = messagingFactory.Value.CreateQueueClient(queueDescription.Path, ReceiveMode.PeekLock); | |
} | |
public void Run() | |
{ | |
var messageOptions = new OnMessageOptions {MaxConcurrentCalls = 1, AutoComplete = true}; | |
messageOptions.ExceptionReceived += OnException; | |
_otaReportingTopicClient.OnMessageAsync(async (receivedMessage) => await _otaMessageDispatcher.Process(receivedMessage), messageOptions); | |
_otaQueueClient.OnMessageAsync(async (receivedMessage) => await _otaMessageDispatcher.Process(receivedMessage), messageOptions); | |
} | |
public void Stop() | |
{ | |
CloseClient(_otaReportingTopicClient); | |
CloseClient(_otaQueueClient); | |
} | |
private void CloseClient(MessageClientEntity client) | |
{ | |
if (client != null && !client.IsClosed) | |
{ | |
client.Close(); | |
} | |
} | |
private void OnException(object sender, ExceptionReceivedEventArgs e) | |
{ | |
if (e.Exception == null) | |
{ | |
return; | |
} | |
if (e.Exception is AlreadyLoggedException) | |
{ | |
return; | |
} | |
LogTo.Error(e.Exception, "Error processing message. Action: {action}, Sender: {@sender}", e.Action, sender); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment