Skip to content

Instantly share code, notes, and snippets.

@ramonsmits
Last active September 27, 2023 15:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ramonsmits/0c307e83cf6913b95f3ec5227c76912b to your computer and use it in GitHub Desktop.
Save ramonsmits/0c307e83cf6913b95f3ec5227c76912b to your computer and use it in GitHub Desktop.
NServiceBus TimeoutBehavior
class TimeoutBehavior : Behavior<IIncomingPhysicalMessageContext>
{
static readonly ILog Log = LogManager.GetLogger<TimeoutBehavior>();
static readonly TimeSpan TimeoutDuration = TimeSpan.FromSeconds(5);
static readonly TimeSpan CancellationThresholdDuration = TimeSpan.FromSeconds(5);
static readonly TimeSpan TerminationDetectionDuration = TimeoutDuration + CancellationThresholdDuration;
public override async Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
{
using var timeoutCTS = new CancellationTokenSource(TimeoutDuration);
using var linkedCTS = CancellationTokenSource.CreateLinkedTokenSource(context.CancellationToken, timeoutCTS.Token);
context.ReplaceCancellationToken(linkedCTS.Token); // <-- Need to replace cancellation token on the remainder of the pipeline to cancel ANYTHING on next()
var delayTask = Task.Delay(TerminationDetectionDuration); // 10:10
var firstCompletedTask = await Task.WhenAny(
delay,
next() // or... would have the ability to do next(linkedCTS.Token)
);
if (firstCompletedTask == delayTask)
{
// Handler is hanging after timeout was triggered
Log.FatalFormat("Detected non-terminating handler for message: {0}", context.MessageId);
}
else
{
// TODO: Cancel delayTask to not massively leak them
}
timeoutCTS.Cancel();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment