Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Throttling concurrency by message type in NServiceBus
using NServiceBus.Pipeline;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
internal class ConcurrencyThrottlingByMessageType : IBehavior<IIncomingLogicalMessageContext, IIncomingLogicalMessageContext>
{
private static readonly Dictionary<Type, Throttle> throttles = new Dictionary<Type, Throttle>
{
{ typeof(FooMessage), new Throttle { Semaphore = new SemaphoreSlim(1), TimeOut = TimeSpan.FromSeconds(1) } },
{ typeof(BarMessage), new Throttle { Semaphore = new SemaphoreSlim(1), TimeOut = TimeSpan.FromSeconds(10) } },
};
public async Task Invoke(IIncomingLogicalMessageContext context, Func<IIncomingLogicalMessageContext, Task> next)
{
if (!throttles.TryGetValue(context.Message.MessageType, out var throttle))
{
await next(context).ConfigureAwait(false);
return;
}
if (!await throttle.Semaphore.WaitAsync(throttle.TimeOut).ConfigureAwait(false))
{
throw new TimeoutException($"{context.Message.MessageType} throttle semaphore was not entered within {throttle.TimeOut}.");
}
try
{
await next(context).ConfigureAwait(false);
}
finally
{
throttle.Semaphore.Release();
}
}
private class Throttle
{
public SemaphoreSlim Semaphore { get; set; }
public TimeSpan TimeOut { get; set; }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.