Skip to content

Instantly share code, notes, and snippets.

@ramonsmits
Last active July 4, 2023 16:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ramonsmits/8cb5b800d74b4b573f8a1659b9b92c7f to your computer and use it in GitHub Desktop.
Save ramonsmits/8cb5b800d74b4b573f8a1659b9b92c7f to your computer and use it in GitHub Desktop.
Simple messaging bridge forwarder acting as a shovel between different NServiceBus transports using NServiceBus 8.x with NServiceBus.Raw 4.x
using NServiceBus;
using NServiceBus.Logging;
using NServiceBus.Raw;
using NServiceBus.Routing;
using NServiceBus.Transport;
CancellationTokenSource cts = new();
Console.CancelKeyPress += (s, ea) =>
{
ea.Cancel = true;
cts.Cancel();
};
LogManager.Use<DefaultFactory>().Level(LogLevel.Debug);
var source = new LearningTransport { StorageDirectory = "source" };
var destination = new LearningTransport { StorageDirectory = "target" };
var shovel = new Shovel("testqueue", source, destination);
try
{
await shovel.Start(cts.Token);
await Task.Delay(Timeout.Infinite, cts.Token);
await shovel.Stop(cts.Token);
}
catch (OperationCanceledException) when (cts.Token.IsCancellationRequested)
{
}
class Shovel
{
readonly ILog log;
readonly string endpointName;
readonly string poisonMessageQueue;
readonly TransportDefinition sourceTransportDefinition;
readonly TransportDefinition targetTransportDefinition;
IReceivingRawEndpoint? source, target;
public TransportTransactionMode TransportTransactionMode { get; set; } = TransportTransactionMode.ReceiveOnly;
public Shovel(string endpointName, TransportDefinition sourceTransportDefinition, TransportDefinition targetTransportDefinition, string? poisonMessageQueue = null)
{
if (endpointName == null) throw new ArgumentNullException(nameof(endpointName));
if (sourceTransportDefinition == null) throw new ArgumentNullException(nameof(sourceTransportDefinition));
if (targetTransportDefinition == null) throw new ArgumentNullException(nameof(targetTransportDefinition));
poisonMessageQueue ??= endpointName + "_error";
log = LogManager.GetLogger($"{GetType()}-{sourceTransportDefinition}-{targetTransportDefinition}");
this.sourceTransportDefinition = sourceTransportDefinition;
this.targetTransportDefinition = targetTransportDefinition;
this.endpointName = endpointName;
this.poisonMessageQueue = poisonMessageQueue;
}
public async Task Start(CancellationToken cancellationToken = default)
{
if (source != null || target != null) throw new InvalidOperationException("Not stopped");
sourceTransportDefinition.TransportTransactionMode = TransportTransactionMode;
targetTransportDefinition.TransportTransactionMode = TransportTransactionMode;
var sourceConfig = RawEndpointConfiguration.Create(
endpointName: endpointName,
onMessage: OnMessage,
poisonMessageQueue: poisonMessageQueue,
transportDefinition: sourceTransportDefinition
);
var targetConfig = RawEndpointConfiguration.CreateSendOnly(
endpointName: endpointName,
targetTransportDefinition
);
var results = await Task.WhenAll(
RawEndpoint.Start(sourceConfig, cancellationToken),
RawEndpoint.Start(targetConfig, cancellationToken)
);
source = results[0];
target = results[1];
}
async Task OnMessage(MessageContext context, IMessageDispatcher dispatcher, CancellationToken cancellationToken)
{
try
{
var messageId = context.Headers[Headers.MessageId] ?? context.NativeMessageId;
var request = new OutgoingMessage(
messageId: messageId,
headers: context.Headers,
body: context.Body
);
var operation = new TransportOperation(
request,
new UnicastAddressTag(target!.EndpointName));
await target.Dispatch(
outgoingMessages: new TransportOperations(operation),
transaction: new TransportTransaction(),
cancellationToken
);
}
catch (Exception e)
{
log.Error("OnMessage", e);
throw;
}
}
public Task Stop(CancellationToken cancellationToken = default)
{
if (source == null || target == null) throw new InvalidOperationException("Not running");
return Task.WhenAll(source.Stop(cancellationToken), target.Stop(cancellationToken));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment