Created
April 16, 2015 09:32
-
-
Save Soopster/dd0fbd754a65fc5edfa9 to your computer and use it in GitHub Desktop.
Message Lock Renewal For Azure Service Bus Brokered Messages
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private async Task OnMessageReceivedAsync(BrokeredMessage receivedMessage) | |
{ | |
try{ | |
var messageLock = new MessageLock(receivedMessage)){ | |
// Process receivedMessage | |
await messageLock.CompleteAsync(); | |
} | |
finally { | |
messageLock.Dispose(); | |
} | |
} | |
uses Nuget | |
<package id="Nito.AsyncEx" version="3.0.0" targetFramework="net45" /> | |
public class MessageLock : IDisposable | |
{ | |
private readonly AsyncLock _asyncLock = new AsyncLock(); | |
private BrokeredMessage _message; | |
private Timer _timer; | |
public MessageLock(BrokeredMessage message) | |
{ | |
_message = message; | |
InitializeTimer(); | |
} | |
public void Dispose() | |
{ | |
AbandonAsync().Wait(); | |
} | |
public async Task CompleteAsync() | |
{ | |
using (await _asyncLock.LockAsync()) | |
{ | |
if (_timer != null) | |
{ | |
_timer.Dispose(); | |
} | |
if (_message == null) | |
{ | |
return; | |
} | |
try | |
{ | |
await _message.CompleteAsync().ConfigureAwait(false); | |
} | |
catch (Exception e) | |
{ | |
throw; | |
} | |
finally | |
{ | |
_message = null; | |
} | |
} | |
} | |
public async Task AbandonAsync() | |
{ | |
using (await _asyncLock.LockAsync()) | |
{ | |
if (_timer != null) | |
{ | |
_timer.Dispose(); | |
} | |
if (_message == null) | |
{ | |
return; | |
} | |
try | |
{ | |
await _message.AbandonAsync().ConfigureAwait(false); | |
} | |
catch (Exception e) | |
{ | |
throw; | |
} | |
finally | |
{ | |
_message = null; | |
} | |
} | |
} | |
public async Task DeadLetterAsync(string deadLetterReason, string deadLetterDescription) | |
{ | |
using (await _asyncLock.LockAsync()) | |
{ | |
if (_timer != null) | |
{ | |
_timer.Dispose(); | |
} | |
if (_message == null) | |
{ | |
return; | |
} | |
try | |
{ | |
await _message.DeadLetterAsync(deadLetterReason, deadLetterDescription).ConfigureAwait(false); | |
} | |
catch (Exception e) | |
{ | |
throw; | |
} | |
finally | |
{ | |
_message = null; | |
} | |
} | |
} | |
private void InitializeTimer() | |
{ | |
var lockedUntil = _message.LockedUntilUtc.Subtract(DateTime.UtcNow); | |
var renewInterval = new TimeSpan( | |
(long) Math.Round(lockedUntil.Ticks*0.7, | |
0, | |
MidpointRounding.AwayFromZero)); | |
if (renewInterval.TotalMilliseconds < 0) | |
{ | |
Trace.TraceError(string.Format("Invalid message lock renewel value {0} for message {1}", renewInterval, _message.MessageId)); | |
return; | |
} | |
_timer = new Timer(async state => | |
{ | |
using (await _asyncLock.LockAsync()) | |
{ | |
if (_message == null) | |
{ | |
return; | |
} | |
try | |
{ | |
Trace.WriteLine("Renewing Message Lock for message id: " + _message.MessageId); | |
await _message.RenewLockAsync().ConfigureAwait(false); | |
_timer.Change(renewInterval, TimeSpan.FromMilliseconds(-1)); | |
} | |
catch (Exception e) | |
{ | |
Trace.TraceError(string.Format("Error renewing message lock for message Id: {0} {1} {2}", _message.MessageId, Environment.NewLine, e)); | |
_message = null; | |
} | |
} | |
}, | |
null, | |
renewInterval, | |
TimeSpan.FromMilliseconds(-1)); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment