Skip to content

Instantly share code, notes, and snippets.

@timReynolds
Last active October 20, 2016 10:01
Show Gist options
  • Save timReynolds/e5d214ba6c0550a4a0c8470e8aa96008 to your computer and use it in GitHub Desktop.
Save timReynolds/e5d214ba6c0550a4a0c8470e8aa96008 to your computer and use it in GitHub Desktop.
NServiceBus Notification FailedMessageResponder
public const string FailedMessageResponderNotResponding = "No failed message response defined for {0}";
public const string FailedMessageResponderResponding = "Creating {0} for failed message {1}";
using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using ASI.Logging.Contracts;
using ASI.Message;
using Newtonsoft.Json;
using NServiceBus;
using NServiceBus.Faults;
using PF.Message.Requests.FinancedTransactions;
using PF.Message.Responses.FinancedTransactions;
using PFBOI.Saga.Host.Constants;
using JsonSerializer = Newtonsoft.Json.JsonSerializer;
namespace PFBOI.Saga.Host.Functions
{
public class FailedMessageResponder : IFailedMessageResponder
{
private readonly IAsiLogger asiLogger;
private readonly IEndpointInstance endpointInstance;
private readonly Dictionary<string, Type> responseToMessages;
public FailedMessageResponder(IAsiLogger asiLogger, IEndpointInstance endpointInstance)
{
this.asiLogger = asiLogger;
this.endpointInstance = endpointInstance;
responseToMessages = new Dictionary<string, Type>
{
{typeof(FinancedTransactionsRequest).AssemblyQualifiedName, typeof(FinancedTransactionsFailedResponse)}
};
}
public void HandleFailedMessage(object sender, FailedMessage failedMessage)
{
var messageType = failedMessage.Headers[Headers.EnclosedMessageTypes];
Type responseEventType;
if (!responseToMessages.TryGetValue(messageType, out responseEventType))
{
asiLogger.DebugFormat(this, HostConstants.FailedMessageResponderNotResponding, messageType);
return;
}
asiLogger.DebugFormat(this, HostConstants.FailedMessageResponderResponding, responseEventType, messageType);
var failedMessageBody = (IAsiMessage) GetMessageFromBody(failedMessage.Body, messageType);
var originatingEndpoint = failedMessage.Headers[Headers.OriginatingEndpoint];
var originatingMachine = failedMessage.Headers[Headers.OriginatingMachine];
var failedMessageEvent = (IAsiMessage) Activator.CreateInstance(responseEventType);
failedMessageEvent.CorrelationId = failedMessageBody.CorrelationId;
endpointInstance.Send($"{originatingEndpoint}@{originatingMachine}", failedMessageEvent);
}
private object GetMessageFromBody(byte[] messageBody, string messageTypeString)
{
using (var stream = new MemoryStream(messageBody))
{
var reader = CreateJsonReader(stream);
var jsonSerializer = JsonSerializer.Create();
return jsonSerializer.Deserialize(reader, Type.GetType(messageTypeString));
}
}
private JsonReader CreateJsonReader(Stream stream)
{
var streamReader = new StreamReader(stream, Encoding.UTF8);
return new JsonTextReader(streamReader);
}
}
}
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using ASI.Logging.Contracts;
using Moq;
using Newtonsoft.Json;
using NServiceBus.Faults;
using NServiceBus.Testing;
using NUnit.Framework;
using PF.Message.Requests.FinancedTransactions;
using PF.Message.Responses.FinancedTransactions;
using PFBOI.Saga.Host.Constants;
using PFBOI.Saga.Host.Functions;
namespace PFBOI.Saga.Host.UnitTests.Functions
{
[TestFixture]
public class FailedMessageResponderTests
{
private Mock<IAsiLogger> asiLogger;
private TestableEndpointInstance endpointInstance;
private FailedMessageResponder sut;
[SetUp]
public void SetUp()
{
asiLogger = new Mock<IAsiLogger>();
endpointInstance = new TestableEndpointInstance();
sut = new FailedMessageResponder(asiLogger.Object, endpointInstance);
}
[Test]
public void HandleFailedMessage_WhenGivenAMessageTypeThatIsNotRespondedToOnError_ShouldNotRespondToError()
{
// Arrange
var messageType = "NotAMessageType";
var failedMessage = CreateFailedMessage(new byte[] { }, messageType);
// Act
sut.HandleFailedMessage(new object(), failedMessage);
// Assert
asiLogger.Verify(a => a.DebugFormat(sut, HostConstants.FailedMessageResponderNotResponding, messageType), Times.Once);
}
[Test]
public void HandleFailedMessage_WhenGivenAMessageTypeThatIsRespondedToOnError_ShouldRespondToError()
{
// Arrange
var messageType = typeof(FinancedTransactionsRequest).AssemblyQualifiedName;
var originatingEndpoint = "originatingEndpoint";
var originatingMachine = "originatingMachine";
var failedMessageBody = new FinancedTransactionsRequest
{
CorrelationId = Guid.NewGuid()
};
var failedMessage = CreateFailedMessage(SerializeToByteArray(failedMessageBody), messageType, originatingEndpoint, originatingMachine);
// Act
sut.HandleFailedMessage(new object(), failedMessage);
// Assert
asiLogger.Verify(a => a.DebugFormat(sut, HostConstants.FailedMessageResponderResponding, typeof(FinancedTransactionsFailedResponse), messageType), Times.Once);
Assert.AreEqual(1, endpointInstance.SentMessages.Length);
var sentMessage = endpointInstance.SentMessages.First();
Assert.IsInstanceOf<FinancedTransactionsFailedResponse>(sentMessage.Message);
var message = (FinancedTransactionsFailedResponse) sentMessage.Message;
Assert.AreEqual(failedMessageBody.CorrelationId, message.CorrelationId);
}
private byte[] SerializeToByteArray(object message)
{
using (var stream = new MemoryStream())
{
var jsonWriter = CreateJsonReader(stream);
JsonSerializer.Create().Serialize(jsonWriter, message);
jsonWriter.Flush();
return stream.ToArray();
}
}
private JsonWriter CreateJsonReader(Stream stream)
{
var streamWriter = new StreamWriter(stream, Encoding.UTF8);
return new JsonTextWriter(streamWriter);
}
private FailedMessage CreateFailedMessage(byte[] body, string enclosedMessageTypes = "", string originatingEndpoint = "", string originatingMachine = "", string exceptionMessage = "")
{
var headers = new Dictionary<string, string>
{
{"NServiceBus.EnclosedMessageTypes", enclosedMessageTypes},
{"NServiceBus.OriginatingEndpoint", originatingEndpoint},
{"NServiceBus.OriginatingMachine", originatingMachine},
};
return new FailedMessage(Guid.NewGuid().ToString(), headers, body, new Exception(exceptionMessage), "");
}
}
}
private void SubscribeToErrorQueueNotifications(Notifications notifications)
{
var errors = notifications.Errors;
var errorMessageResponder = Container.Resolve<IFailedMessageResponder>();
errors.MessageSentToErrorQueue += LogMessageSentToErrorQueue;
errors.MessageSentToErrorQueue += errorMessageResponder.HandleFailedMessage;
}
@scrooby
Copy link

scrooby commented Oct 6, 2016

You'll want to publish the failedMessageEvent instead of sending it, that would also mean the code to get the originating endpoint and machine name could be removed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment