Created
June 22, 2015 07:56
-
-
Save RhysC/26f2640fc051bc7627e0 to your computer and use it in GitHub Desktop.
Idempotency in NSeviceBus using Azure Table Storage and IManageUnitOfwork (brain dump)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using Microsoft.WindowsAzure.Storage; | |
using Microsoft.WindowsAzure.Storage.Table; | |
using NServiceBus; | |
using NServiceBus.UnitOfWork; | |
namespace MyCommon.UnitOfWorkManagers | |
{ | |
public interface IBaseMessage | |
{ | |
string MessageId { get; } | |
} | |
//http://docs.particular.net/nservicebus/pipeline/unit-of-work | |
public class IdempotencyUnitOfWorkManager : IManageUnitsOfWork | |
{ | |
public IIdempotencyManager IdempotencyManager { get; set; } | |
public void Begin() { } | |
public void End(Exception ex = null) | |
{ | |
if (ex == null) | |
{ | |
IdempotencyManager.Commit(); | |
} | |
} | |
} | |
public interface IIdempotencyManager | |
{ | |
bool HasMessageBeenProcessed(IBaseMessage message); | |
void InsertMessage(IBaseMessage message); | |
void Commit(); | |
} | |
public class IdempotencyHandler : IHandleMessages<IBaseMessage> | |
{ | |
public IBus Bus { get; set; } | |
public IIdempotencyManager IdempotencyManager { get; set; } | |
public void Handle(IBaseMessage message) | |
{ | |
if (IdempotencyManager.HasMessageBeenProcessed(message)) | |
{ | |
//TODO - Log the fact we are going to drop the message | |
Bus.DoNotContinueDispatchingCurrentMessageToHandlers(); | |
} | |
else | |
{ | |
IdempotencyManager.InsertMessage(message); | |
} | |
} | |
} | |
public class AzureTableStorageIdempotencyManager : IIdempotencyManager | |
{ | |
private readonly Lazy<CloudTable> _messageTable; | |
private readonly List<RecievedMessage> _messagesToInsert = new List<RecievedMessage>(); | |
public AzureTableStorageIdempotencyManager(CloudStorageAccount cloudStorageAccount) | |
{ | |
_messageTable = new Lazy<CloudTable>(() => | |
{ | |
var cloudTableClient = cloudStorageAccount.CreateCloudTableClient(); | |
return cloudTableClient.GetTableReference("RecievedMessages"); | |
}); | |
} | |
public bool HasMessageBeenProcessed(IBaseMessage message) | |
{ | |
var retrieveOperation = RecievedMessage.GetRetireveOptertion(message); | |
var retrieveOperationResult = _messageTable.Value.Execute(retrieveOperation); | |
var loggedMessageResult = retrieveOperationResult.Result as DynamicTableEntity; | |
if (loggedMessageResult == null) | |
return false; | |
//log the fact we have recieved this message before - show message content including hedaer (ie transport id etc) | |
return false; | |
} | |
public void InsertMessage(IBaseMessage message) | |
{ | |
_messagesToInsert.Add(new RecievedMessage(message)); | |
} | |
public void Commit() | |
{ | |
//Could be done as an Entity Group Transactions (https://msdn.microsoft.com/en-us/library/azure/dd894038.aspx) | |
// however | |
// - if there are multiple entires we dont know if they of the same partion | |
// - and typically there will only be one application level message per tranasport message anyway | |
foreach (var recievedMessage in _messagesToInsert) | |
{ | |
var insertOperation = TableOperation.Insert(recievedMessage); | |
_messageTable.Value.Execute(insertOperation); | |
} | |
} | |
public class RecievedMessage : TableEntity | |
{ | |
public RecievedMessage(IBaseMessage message) | |
{ | |
PartitionKey = message.GetType().Name; | |
RowKey = message.MessageId; | |
} | |
public static TableOperation GetRetireveOptertion(IBaseMessage message) | |
{ | |
return TableOperation.Retrieve(message.GetType().Name, message.MessageId); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment