Last active
September 27, 2023 15:59
-
-
Save ramonsmits/0c307e83cf6913b95f3ec5227c76912b to your computer and use it in GitHub Desktop.
NServiceBus TimeoutBehavior
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class TimeoutBehavior : Behavior<IIncomingPhysicalMessageContext> | |
{ | |
static readonly ILog Log = LogManager.GetLogger<TimeoutBehavior>(); | |
static readonly TimeSpan TimeoutDuration = TimeSpan.FromSeconds(5); | |
static readonly TimeSpan CancellationThresholdDuration = TimeSpan.FromSeconds(5); | |
static readonly TimeSpan TerminationDetectionDuration = TimeoutDuration + CancellationThresholdDuration; | |
public override async Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next) | |
{ | |
using var timeoutCTS = new CancellationTokenSource(TimeoutDuration); | |
using var linkedCTS = CancellationTokenSource.CreateLinkedTokenSource(context.CancellationToken, timeoutCTS.Token); | |
context.ReplaceCancellationToken(linkedCTS.Token); // <-- Need to replace cancellation token on the remainder of the pipeline to cancel ANYTHING on next() | |
var delayTask = Task.Delay(TerminationDetectionDuration); // 10:10 | |
var firstCompletedTask = await Task.WhenAny( | |
delay, | |
next() // or... would have the ability to do next(linkedCTS.Token) | |
); | |
if (firstCompletedTask == delayTask) | |
{ | |
// Handler is hanging after timeout was triggered | |
Log.FatalFormat("Detected non-terminating handler for message: {0}", context.MessageId); | |
} | |
else | |
{ | |
// TODO: Cancel delayTask to not massively leak them | |
} | |
timeoutCTS.Cancel(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment