Skip to content

Instantly share code, notes, and snippets.

@Nisden
Created February 6, 2023 12:39
Show Gist options
  • Save Nisden/216bf3bef908217b9cc469b7bc4ff764 to your computer and use it in GitHub Desktop.
Save Nisden/216bf3bef908217b9cc469b7bc4ff764 to your computer and use it in GitHub Desktop.
Rebus Single Receiver Decorator
using Rebus.Messages;
using Rebus.Transport;
namespace Web.RebusExtensions
{
public sealed class SingleReceiverTransportDecorator : ITransport, IDisposable
{
private static readonly TimeSpan emptyMessageCooldown = TimeSpan.FromSeconds(1);
private readonly ITransport transport;
private readonly SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1);
private DateTime? nextReceiveAllowed;
public string Address => transport.Address;
public SingleReciverTransportDecorator(ITransport transport)
{
this.transport = transport;
}
public void CreateQueue(string address) => transport.CreateQueue(address);
public async Task Send(string destinationAddress, TransportMessage message, ITransactionContext context) => await transport.Send(destinationAddress, message, context);
public async Task<TransportMessage?> Receive(ITransactionContext context, CancellationToken cancellationToken)
{
await semaphoreSlim.WaitAsync(cancellationToken);
try
{
// Skip calling Receive, if we recently observed an empty retrive
if (nextReceiveAllowed > DateTime.UtcNow)
{
return null;
}
// Retrive from transport
var result = await transport.Receive(context, cancellationToken);
if (result == null)
{
nextReceiveAllowed = DateTime.UtcNow.Add(emptyMessageCooldown);
}
return result;
}
finally
{
semaphoreSlim.Release();
}
}
public void Dispose()
{
semaphoreSlim.Dispose();
GC.SuppressFinalize(this);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment