Skip to content

Instantly share code, notes, and snippets.

@ramonsmits
Last active February 9, 2021 05:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save ramonsmits/7fc3a0945449f6f4e303eb3b2fcd7702 to your computer and use it in GitHub Desktop.
Save ramonsmits/7fc3a0945449f6f4e303eb3b2fcd7702 to your computer and use it in GitHub Desktop.
NServiceBus 7 behavior that adds a cancellation token to the context which can be retrieved by handler
//
// NServiceBus v8 will have improved support for cooperative cancellation.
//
// Register behavior:
//
// endpointConfiguration.Pipeline.Register(new CancellationTimeoutBehavior(TimeSpan.FromSeconds(5)), nameof(CancellationTimeoutBehavior));
//
using System;
using System.Threading;
using System.Threading.Tasks;
using NServiceBus;
using NServiceBus.Extensibility;
using NServiceBus.Logging;
using NServiceBus.Pipeline;
public class HandlerThatGetsCancelleationTokenFromContext : IHandleMessages<object>
{
public async Task Handle(object message, IMessageHandlerContext context)
{
var cancellationToken = context.CancellationToken();
await Task.Delay(Timeout.Infinite, cancellationToken);
}
}
public class CancellationTimeoutBehavior : Behavior<IIncomingLogicalMessageContext>
{
static readonly ILog Log = LogManager.GetLogger<CancellationTimeoutBehavior>();
TimeSpan OperationTimeout { get; }
public CancellationTimeoutBehavior(TimeSpan operationTimeout)
{
OperationTimeout = operationTimeout;
}
public override async Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
{
using (var cancellationTokenSource = new CancellationTokenSource(OperationTimeout))
{
var timeoutTask = Task.Delay(OperationTimeout, cancellationTokenSource.Token);
context.Extensions.Set(cancellationTokenSource.Token);
await next();
if (cancellationTokenSource.IsCancellationRequested)
{
// Not honored because the if statement would not be run if an OperationCanceledException would occur.
Log.WarnFormat("Cancellation was requested but not honored while processing message '{0}'", context.MessageId);
}
}
}
}
public static class IExtendableCancellationTokenExtension
{
public static CancellationToken CancellationToken(this IExtendable context)
{
return context.Extensions.Get<CancellationToken>();
}
}
// Alternative way to retrieve cancellation token:
public abstract class HandleMessages<T> : IHandleMessages<T>
{
public Task Handle(T message, IMessageHandlerContext context)
{
return Handle(message, context, context.CancellationToken());
}
public abstract Task Handle(T message, IMessageHandlerContext context, CancellationToken cancellationToken);
}
public class GetsCancelleationTokenViaBaseClass : HandleMessages<object>
{
public override async Task Handle(object message, IMessageHandlerContext context, CancellationToken cancellationToken)
{
await Task.Delay(Timeout.Infinite, cancellationToken);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment