Skip to content

Instantly share code, notes, and snippets.

@Soopster
Created April 16, 2015 09:32
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save Soopster/dd0fbd754a65fc5edfa9 to your computer and use it in GitHub Desktop.
Save Soopster/dd0fbd754a65fc5edfa9 to your computer and use it in GitHub Desktop.
Message Lock Renewal For Azure Service Bus Brokered Messages
private async Task OnMessageReceivedAsync(BrokeredMessage receivedMessage)
{
try{
var messageLock = new MessageLock(receivedMessage)){
// Process receivedMessage
await messageLock.CompleteAsync();
}
finally {
messageLock.Dispose();
}
}
uses Nuget
<package id="Nito.AsyncEx" version="3.0.0" targetFramework="net45" />
public class MessageLock : IDisposable
{
private readonly AsyncLock _asyncLock = new AsyncLock();
private BrokeredMessage _message;
private Timer _timer;
public MessageLock(BrokeredMessage message)
{
_message = message;
InitializeTimer();
}
public void Dispose()
{
AbandonAsync().Wait();
}
public async Task CompleteAsync()
{
using (await _asyncLock.LockAsync())
{
if (_timer != null)
{
_timer.Dispose();
}
if (_message == null)
{
return;
}
try
{
await _message.CompleteAsync().ConfigureAwait(false);
}
catch (Exception e)
{
throw;
}
finally
{
_message = null;
}
}
}
public async Task AbandonAsync()
{
using (await _asyncLock.LockAsync())
{
if (_timer != null)
{
_timer.Dispose();
}
if (_message == null)
{
return;
}
try
{
await _message.AbandonAsync().ConfigureAwait(false);
}
catch (Exception e)
{
throw;
}
finally
{
_message = null;
}
}
}
public async Task DeadLetterAsync(string deadLetterReason, string deadLetterDescription)
{
using (await _asyncLock.LockAsync())
{
if (_timer != null)
{
_timer.Dispose();
}
if (_message == null)
{
return;
}
try
{
await _message.DeadLetterAsync(deadLetterReason, deadLetterDescription).ConfigureAwait(false);
}
catch (Exception e)
{
throw;
}
finally
{
_message = null;
}
}
}
private void InitializeTimer()
{
var lockedUntil = _message.LockedUntilUtc.Subtract(DateTime.UtcNow);
var renewInterval = new TimeSpan(
(long) Math.Round(lockedUntil.Ticks*0.7,
0,
MidpointRounding.AwayFromZero));
if (renewInterval.TotalMilliseconds < 0)
{
Trace.TraceError(string.Format("Invalid message lock renewel value {0} for message {1}", renewInterval, _message.MessageId));
return;
}
_timer = new Timer(async state =>
{
using (await _asyncLock.LockAsync())
{
if (_message == null)
{
return;
}
try
{
Trace.WriteLine("Renewing Message Lock for message id: " + _message.MessageId);
await _message.RenewLockAsync().ConfigureAwait(false);
_timer.Change(renewInterval, TimeSpan.FromMilliseconds(-1));
}
catch (Exception e)
{
Trace.TraceError(string.Format("Error renewing message lock for message Id: {0} {1} {2}", _message.MessageId, Environment.NewLine, e));
_message = null;
}
}
},
null,
renewInterval,
TimeSpan.FromMilliseconds(-1));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment