|
using System; |
|
using System.Configuration; |
|
using System.Globalization; |
|
using System.IO; |
|
using System.Linq; |
|
using System.Threading; |
|
using System.Xml.Serialization; |
|
|
|
using EPiServer.DataAbstraction; |
|
using EPiServer.Logging; |
|
using EPiServer.Reference.Commerce.Site.Features.Shared.Jobs; |
|
using EPiServer.ServiceLocation; |
|
|
|
using Mediachase.Commerce.Orders; |
|
|
|
using Microsoft.ServiceBus; |
|
using Microsoft.ServiceBus.Messaging; |
|
|
|
[ServiceConfiguration(typeof(IMessagingService), Lifecycle = ServiceInstanceScope.Singleton)] |
|
public class MessagingService : IMessagingService |
|
{ |
|
private const string TopicName = "Orders"; |
|
|
|
private const string AgentSubscriptionName = "OrderSubscription"; |
|
|
|
private static TopicClient topicClient; |
|
|
|
private static readonly ILogger Log = LogManager.GetLogger(); |
|
|
|
private readonly ScheduledJobRepository scheduledJobRepository; |
|
|
|
private readonly NamespaceManager namespaceManager; |
|
|
|
public MessagingService(ScheduledJobRepository scheduledJobRepository) |
|
{ |
|
this.scheduledJobRepository = scheduledJobRepository; |
|
this.namespaceManager = NamespaceManager.Create(); |
|
|
|
if (!VerifyConfiguration()) |
|
{ |
|
return; |
|
} |
|
|
|
this.CreateTopicAndSubscription(); |
|
|
|
this.MessagingEnabled = this.CheckMessagingActive(); |
|
} |
|
|
|
/// <summary> |
|
/// Gets a value indicating whether [messaging enabled]. |
|
/// </summary> |
|
/// <value><c>true</c> if [messaging enabled]; otherwise, <c>false</c>.</value> |
|
public bool MessagingEnabled { get; } |
|
|
|
/// <summary> |
|
/// Gets a value indicating whether [messaging scheduled job active]. |
|
/// </summary> |
|
/// <value><c>true</c> if [messaging scheduled job active]; otherwise, <c>false</c>.</value> |
|
public bool MessagingScheduledJobActive |
|
{ |
|
get |
|
{ |
|
return this.CheckScheduledJobActive(); |
|
} |
|
} |
|
|
|
/// <summary> |
|
/// Sends the message. |
|
/// </summary> |
|
/// <param name="cart">The cart.</param> |
|
/// <exception cref="MessagingException">An error occurred while sending message.</exception> |
|
public bool SendMessage(Cart cart) |
|
{ |
|
if (!this.MessagingEnabled) |
|
{ |
|
return false; |
|
} |
|
|
|
topicClient = TopicClient.Create(TopicName); |
|
bool messageSent = false; |
|
|
|
try |
|
{ |
|
// Create message, with a TTL of one week |
|
using (BrokeredMessage message = new BrokeredMessage(Serialize(cart)) { TimeToLive = new TimeSpan(168, 0, 0) }) |
|
{ |
|
topicClient.Send(message); |
|
messageSent = true; |
|
} |
|
} |
|
catch (MessagingException messagingException) |
|
{ |
|
if (!messagingException.IsTransient) |
|
{ |
|
Log.Error(messagingException.Message, messagingException.InnerException); |
|
} |
|
|
|
HandleTransientErrors(messagingException); |
|
} |
|
catch (Exception exception) |
|
{ |
|
Log.Error(exception.Message, exception); |
|
} |
|
|
|
topicClient.Close(); |
|
|
|
return messageSent; |
|
} |
|
|
|
/// <summary> |
|
/// Receives the messages. |
|
/// </summary> |
|
/// <returns>The message count.</returns> |
|
/// <exception cref="MessagingException">Condition.</exception> |
|
public string ReceiveMessages() |
|
{ |
|
if (!this.MessagingEnabled) |
|
{ |
|
return "Messaging not enabled."; |
|
} |
|
|
|
SubscriptionClient agentSubscriptionClient = SubscriptionClient.Create(TopicName, AgentSubscriptionName); |
|
|
|
int messageCount = 0; |
|
int failedCount = 0; |
|
|
|
while (true) |
|
{ |
|
BrokeredMessage message = null; |
|
try |
|
{ |
|
//receive messages from Agent Subscription |
|
message = agentSubscriptionClient.Receive(TimeSpan.FromSeconds(5)); |
|
|
|
if (message != null) |
|
{ |
|
string messageBody = message.GetBody<string>(); |
|
Cart cart = DeSerialize(messageBody); |
|
cart.SaveAsPurchaseOrder(); |
|
|
|
message.Complete(); |
|
|
|
messageCount += 1; |
|
} |
|
else |
|
{ |
|
//no more messages in the subscription |
|
break; |
|
} |
|
} |
|
catch (MessagingException messagingException) |
|
{ |
|
if (!messagingException.IsTransient) |
|
{ |
|
Log.Error(messagingException.Message, messagingException.InnerException); |
|
|
|
failedCount += 1; |
|
|
|
// Unlock message in subscription. |
|
if (message != null) |
|
{ |
|
message.Abandon(); |
|
} |
|
} |
|
|
|
HandleTransientErrors(messagingException); |
|
} |
|
catch (Exception exception) |
|
{ |
|
Log.Error(exception.Message, exception); |
|
|
|
failedCount += 1; |
|
|
|
// Unlock message in subscription. |
|
if (message != null) |
|
{ |
|
message.Abandon(); |
|
} |
|
} |
|
} |
|
|
|
agentSubscriptionClient.Close(); |
|
|
|
return string.Format(CultureInfo.InvariantCulture, "Processed {0} orders from queue. {1} failed to process.", messageCount, failedCount); |
|
} |
|
|
|
private bool CheckScheduledJobActive() |
|
{ |
|
Type jobType = typeof(MessagingScheduledJob); |
|
|
|
ScheduledJob messagingScheduledJob = |
|
this.scheduledJobRepository.List().SingleOrDefault(x => x.TypeName == jobType.FullName); |
|
|
|
return messagingScheduledJob != null && messagingScheduledJob.IsEnabled; |
|
} |
|
|
|
private static bool VerifyConfiguration() |
|
{ |
|
string connectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"]; |
|
|
|
if (!connectionString.Contains("[your namespace]") && !connectionString.Contains("[your access key]")) |
|
{ |
|
return true; |
|
} |
|
|
|
Log.Error( |
|
"Please update the 'Microsoft.ServiceBus.ConnectionString' appSetting in app.config to specify your Service Bus namespace and secret key."); |
|
|
|
return false; |
|
} |
|
|
|
private bool CheckMessagingActive() |
|
{ |
|
return this.namespaceManager.TopicExists(TopicName) |
|
&& this.namespaceManager.SubscriptionExists(this.namespaceManager.GetTopic(TopicName).Path, AgentSubscriptionName); |
|
} |
|
|
|
private void CreateTopicAndSubscription() |
|
{ |
|
try |
|
{ |
|
Log.Information("Creating or getting Topic {0}", TopicName); |
|
|
|
TopicDescription orderTopic = !this.namespaceManager.TopicExists(TopicName) |
|
? this.namespaceManager.CreateTopic(TopicName) |
|
: this.namespaceManager.GetTopic(TopicName); |
|
|
|
if (this.namespaceManager.SubscriptionExists(orderTopic.Path, AgentSubscriptionName)) |
|
{ |
|
return; |
|
} |
|
|
|
Log.Information("Creating Subscription '{0}'", AgentSubscriptionName); |
|
this.namespaceManager.CreateSubscription(orderTopic.Path, AgentSubscriptionName); |
|
} |
|
catch (MessagingException e) |
|
{ |
|
Log.Error(e.Message, e.InnerException); |
|
} |
|
} |
|
|
|
/// <summary> |
|
/// Handles the transient errors. |
|
/// </summary> |
|
/// <param name="exception">The exception.</param> |
|
private static void HandleTransientErrors(Exception exception) |
|
{ |
|
//If transient error/exception, let's back-off for 2 seconds and retry |
|
Log.Error(exception.Message, exception.InnerException); |
|
Log.Debug("Will retry sending the message in 2 seconds"); |
|
|
|
Thread.Sleep(2000); |
|
} |
|
|
|
/// <summary> |
|
/// Serializes the specified cart. |
|
/// </summary> |
|
/// <param name="cart">The cart.</param> |
|
/// <returns>System.String.</returns> |
|
private static string Serialize(Cart cart) |
|
{ |
|
string serializedData; |
|
|
|
XmlSerializer serializer = new XmlSerializer(typeof(Cart)); |
|
|
|
using (StringWriter sw = new StringWriter()) |
|
{ |
|
serializer.Serialize(sw, cart); |
|
serializedData = sw.ToString(); |
|
} |
|
|
|
return serializedData; |
|
} |
|
|
|
/// <summary> |
|
/// De-serialize to cart. |
|
/// </summary> |
|
/// <param name="serializedData">The serialized data.</param> |
|
/// <returns>Cart.</returns> |
|
private static Cart DeSerialize(string serializedData) |
|
{ |
|
Cart deserializedCart; |
|
|
|
XmlSerializer deserializer = new XmlSerializer(typeof(Cart)); |
|
|
|
using (TextReader tr = new StringReader(serializedData)) |
|
{ |
|
deserializedCart = (Cart)deserializer.Deserialize(tr); |
|
} |
|
|
|
return deserializedCart; |
|
} |
|
} |