Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using NServiceBus;
using NServiceBus.Raw;
using NServiceBus.Routing;
using NServiceBus.Transport;
enum MessageState
{
Failed,
Retried,
}
class Program
{
const string TargetAddressHeader = "ServiceControl.TargetEndpointAddress";
const string RetrytoHeader = "ServiceControl.RetryTo";
static readonly ConcurrentDictionary<string, MessageState> MessageStates = new ConcurrentDictionary<string, MessageState>();
static void Main(string[] args)
{
Start().GetAwaiter().GetResult();
}
static async Task Start()
{
IStartableRawEndpoint errorFilterEndpoint = null;
IStartableRawEndpoint retryFilterEndpoint = null;
var errorFilterConfig = Configure(RawEndpointConfiguration.Create("error.filter", (m, _) => HandleError(m, retryFilterEndpoint), "poison"));
var retryFilterConfig = Configure(RawEndpointConfiguration.Create("retry.filter", (m, _) => HandleRetry(m, errorFilterEndpoint), "poison"));
var errorLogConsumerConfig = Configure(RawEndpointConfiguration.Create("error.log", (m, _) => HandleErrorLog(m), "poison"));
errorFilterEndpoint = await RawEndpoint.Create(errorFilterConfig);
retryFilterEndpoint = await RawEndpoint.Create(retryFilterConfig);
var errorFilter = await errorFilterEndpoint.Start();
var retryFilter = await retryFilterEndpoint.Start();
var logConsumer = await RawEndpoint.Start(errorLogConsumerConfig);
Console.WriteLine("Press <enter> to print status.");
while (true)
{
Console.ReadLine();
PrintStatus();
}
var stoppedRetryFilter = await retryFilter.StopReceiving();
var stoppedErrorFilter = await errorFilter.StopReceiving();
await stoppedRetryFilter.Stop();
await stoppedErrorFilter.Stop();
await logConsumer.Stop();
}
static RawEndpointConfiguration Configure(RawEndpointConfiguration configuration)
{
configuration.UseTransport<MsmqTransport>();
configuration.AutoCreateQueue();
return configuration;
}
static Task HandleErrorLog(MessageContext message)
{
var messageId = message.Headers[Headers.MessageId];
MessageStates.AddOrUpdate(messageId, MessageState.Failed, (key, prev) => MessageState.Failed);
return Task.FromResult(0);
}
static Task HandleRetry(MessageContext message, IRawEndpoint dispatcher)
{
var messageId = message.Headers[Headers.MessageId];
MessageStates.AddOrUpdate(messageId, MessageState.Retried, (key, prev) => MessageState.Retried);
var destination = message.Headers[TargetAddressHeader];
Console.WriteLine($"Forwarding a retry message to {destination}");
message.Headers.Remove(TargetAddressHeader);
var op = new TransportOperation(new OutgoingMessage(message.MessageId, message.Headers, message.Body), new UnicastAddressTag(destination));
return dispatcher.Dispatch(new TransportOperations(op), message.TransportTransaction, message.Extensions);
}
static Task HandleError(MessageContext message, IRawEndpoint dispatcher)
{
message.Headers[RetrytoHeader] = dispatcher.TransportAddress;
var op = new TransportOperation(new OutgoingMessage(message.MessageId, message.Headers, message.Body), new UnicastAddressTag("error"));
return dispatcher.Dispatch(new TransportOperations(op), message.TransportTransaction, message.Extensions);
}
static void PrintStatus()
{
Console.WriteLine("Message states:");
foreach (var messageState in MessageStates)
{
Console.WriteLine($"{messageState.Key}: {messageState.Value}");
}
Console.WriteLine();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment