Skip to content

Instantly share code, notes, and snippets.

@adamralph
Created December 10, 2018 13:47
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save adamralph/a858847216a0930b80659a77b17b1229 to your computer and use it in GitHub Desktop.
Save adamralph/a858847216a0930b80659a77b17b1229 to your computer and use it in GitHub Desktop.
Throttling concurrency by message type in NServiceBus
using NServiceBus.Pipeline;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
internal class ConcurrencyThrottlingByMessageType : IBehavior<IIncomingLogicalMessageContext, IIncomingLogicalMessageContext>
{
private static readonly Dictionary<Type, Throttle> throttles = new Dictionary<Type, Throttle>
{
{ typeof(FooMessage), new Throttle { Semaphore = new SemaphoreSlim(1), TimeOut = TimeSpan.FromSeconds(1) } },
{ typeof(BarMessage), new Throttle { Semaphore = new SemaphoreSlim(1), TimeOut = TimeSpan.FromSeconds(10) } },
};
public async Task Invoke(IIncomingLogicalMessageContext context, Func<IIncomingLogicalMessageContext, Task> next)
{
if (!throttles.TryGetValue(context.Message.MessageType, out var throttle))
{
await next(context).ConfigureAwait(false);
return;
}
if (!await throttle.Semaphore.WaitAsync(throttle.TimeOut).ConfigureAwait(false))
{
throw new TimeoutException($"{context.Message.MessageType} throttle semaphore was not entered within {throttle.TimeOut}.");
}
try
{
await next(context).ConfigureAwait(false);
}
finally
{
throttle.Semaphore.Release();
}
}
private class Throttle
{
public SemaphoreSlim Semaphore { get; set; }
public TimeSpan TimeOut { get; set; }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment