Last active
July 4, 2023 16:09
-
-
Save ramonsmits/8cb5b800d74b4b573f8a1659b9b92c7f to your computer and use it in GitHub Desktop.
Simple messaging bridge forwarder acting as a shovel between different NServiceBus transports using NServiceBus 8.x with NServiceBus.Raw 4.x
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using NServiceBus; | |
using NServiceBus.Logging; | |
using NServiceBus.Raw; | |
using NServiceBus.Routing; | |
using NServiceBus.Transport; | |
CancellationTokenSource cts = new(); | |
Console.CancelKeyPress += (s, ea) => | |
{ | |
ea.Cancel = true; | |
cts.Cancel(); | |
}; | |
LogManager.Use<DefaultFactory>().Level(LogLevel.Debug); | |
var source = new LearningTransport { StorageDirectory = "source" }; | |
var destination = new LearningTransport { StorageDirectory = "target" }; | |
var shovel = new Shovel("testqueue", source, destination); | |
try | |
{ | |
await shovel.Start(cts.Token); | |
await Task.Delay(Timeout.Infinite, cts.Token); | |
await shovel.Stop(cts.Token); | |
} | |
catch (OperationCanceledException) when (cts.Token.IsCancellationRequested) | |
{ | |
} | |
class Shovel | |
{ | |
readonly ILog log; | |
readonly string endpointName; | |
readonly string poisonMessageQueue; | |
readonly TransportDefinition sourceTransportDefinition; | |
readonly TransportDefinition targetTransportDefinition; | |
IReceivingRawEndpoint? source, target; | |
public TransportTransactionMode TransportTransactionMode { get; set; } = TransportTransactionMode.ReceiveOnly; | |
public Shovel(string endpointName, TransportDefinition sourceTransportDefinition, TransportDefinition targetTransportDefinition, string? poisonMessageQueue = null) | |
{ | |
if (endpointName == null) throw new ArgumentNullException(nameof(endpointName)); | |
if (sourceTransportDefinition == null) throw new ArgumentNullException(nameof(sourceTransportDefinition)); | |
if (targetTransportDefinition == null) throw new ArgumentNullException(nameof(targetTransportDefinition)); | |
poisonMessageQueue ??= endpointName + "_error"; | |
log = LogManager.GetLogger($"{GetType()}-{sourceTransportDefinition}-{targetTransportDefinition}"); | |
this.sourceTransportDefinition = sourceTransportDefinition; | |
this.targetTransportDefinition = targetTransportDefinition; | |
this.endpointName = endpointName; | |
this.poisonMessageQueue = poisonMessageQueue; | |
} | |
public async Task Start(CancellationToken cancellationToken = default) | |
{ | |
if (source != null || target != null) throw new InvalidOperationException("Not stopped"); | |
sourceTransportDefinition.TransportTransactionMode = TransportTransactionMode; | |
targetTransportDefinition.TransportTransactionMode = TransportTransactionMode; | |
var sourceConfig = RawEndpointConfiguration.Create( | |
endpointName: endpointName, | |
onMessage: OnMessage, | |
poisonMessageQueue: poisonMessageQueue, | |
transportDefinition: sourceTransportDefinition | |
); | |
var targetConfig = RawEndpointConfiguration.CreateSendOnly( | |
endpointName: endpointName, | |
targetTransportDefinition | |
); | |
var results = await Task.WhenAll( | |
RawEndpoint.Start(sourceConfig, cancellationToken), | |
RawEndpoint.Start(targetConfig, cancellationToken) | |
); | |
source = results[0]; | |
target = results[1]; | |
} | |
async Task OnMessage(MessageContext context, IMessageDispatcher dispatcher, CancellationToken cancellationToken) | |
{ | |
try | |
{ | |
var messageId = context.Headers[Headers.MessageId] ?? context.NativeMessageId; | |
var request = new OutgoingMessage( | |
messageId: messageId, | |
headers: context.Headers, | |
body: context.Body | |
); | |
var operation = new TransportOperation( | |
request, | |
new UnicastAddressTag(target!.EndpointName)); | |
await target.Dispatch( | |
outgoingMessages: new TransportOperations(operation), | |
transaction: new TransportTransaction(), | |
cancellationToken | |
); | |
} | |
catch (Exception e) | |
{ | |
log.Error("OnMessage", e); | |
throw; | |
} | |
} | |
public Task Stop(CancellationToken cancellationToken = default) | |
{ | |
if (source == null || target == null) throw new InvalidOperationException("Not running"); | |
return Task.WhenAll(source.Stop(cancellationToken), target.Stop(cancellationToken)); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment