Skip to content

Instantly share code, notes, and snippets.

@SzymonPobiega
Created March 13, 2020 11:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save SzymonPobiega/57acdc53d64bbff753ca7acdadd1d5cb to your computer and use it in GitHub Desktop.
Save SzymonPobiega/57acdc53d64bbff753ca7acdadd1d5cb to your computer and use it in GitHub Desktop.
Raw message processing using plain NServiceBus
using System;
using System.Threading.Tasks;
using NServiceBus;
using NServiceBus.Extensibility;
using NServiceBus.Pipeline;
using NServiceBus.Routing;
using NServiceBus.Transport;
namespace TestRawProcessing
{
public delegate Task<ErrorHandleResult> OnError(IncomingMessage messageContext, IDispatchMessages dispatcher);
class Program
{
static void Main(string[] args)
{
Start().GetAwaiter().GetResult();
}
static async Task Start()
{
var config = new EndpointConfiguration("RawEndpoint");
config.UseTransport<LearningTransport>();
config.EnableInstallers();
config.SendFailedMessagesTo("error");
config.Pipeline.Register(b => new RawProcessingBehavior(OnMessage, OnError, b.Build<IDispatchMessages>()), "Process raw messages");
config.Recoverability().CustomPolicy((recoverabilityConfig, context) => RecoverabilityAction.ImmediateRetry());
var endpoint = await Endpoint.Start(config);
Console.WriteLine("Press enter to exit");
Console.ReadLine();
await endpoint.Stop();
}
static Task<ErrorHandleResult> OnError(IncomingMessage messagecontext, IDispatchMessages dispatcher)
{
return Task.FromResult(ErrorHandleResult.RetryRequired);
}
static Task OnMessage(IncomingMessage message, IDispatchMessages dispatcher)
{
var outgoingMessage = new OutgoingMessage(message.MessageId, message.Headers, message.Body);
var transportOperations = new TransportOperation(outgoingMessage, new UnicastAddressTag("Receiver"));
return dispatcher.Dispatch(new TransportOperations(transportOperations), new TransportTransaction(),
new ContextBag());
}
}
class RawProcessingBehavior : Behavior<ITransportReceiveContext>
{
readonly Func<IncomingMessage, IDispatchMessages, Task> onMessage;
readonly OnError onError;
readonly IDispatchMessages dispatcher;
public RawProcessingBehavior(Func<IncomingMessage, IDispatchMessages, Task> onMessage, OnError onError, IDispatchMessages dispatcher)
{
this.onMessage = onMessage;
this.onError = onError;
this.dispatcher = dispatcher;
}
public override async Task Invoke(ITransportReceiveContext context, Func<Task> next)
{
try
{
await onMessage(context.Message, dispatcher);
}
catch (Exception e)
{
var result = await onError(context.Message, dispatcher);
if (result == ErrorHandleResult.RetryRequired)
{
throw new Exception("Retry required", e);
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment