Skip to content

Instantly share code, notes, and snippets.

@RhysC
Created June 22, 2015 07:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save RhysC/26f2640fc051bc7627e0 to your computer and use it in GitHub Desktop.
Save RhysC/26f2640fc051bc7627e0 to your computer and use it in GitHub Desktop.
Idempotency in NSeviceBus using Azure Table Storage and IManageUnitOfwork (brain dump)
using System;
using System.Collections.Generic;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Table;
using NServiceBus;
using NServiceBus.UnitOfWork;
namespace MyCommon.UnitOfWorkManagers
{
public interface IBaseMessage
{
string MessageId { get; }
}
//http://docs.particular.net/nservicebus/pipeline/unit-of-work
public class IdempotencyUnitOfWorkManager : IManageUnitsOfWork
{
public IIdempotencyManager IdempotencyManager { get; set; }
public void Begin() { }
public void End(Exception ex = null)
{
if (ex == null)
{
IdempotencyManager.Commit();
}
}
}
public interface IIdempotencyManager
{
bool HasMessageBeenProcessed(IBaseMessage message);
void InsertMessage(IBaseMessage message);
void Commit();
}
public class IdempotencyHandler : IHandleMessages<IBaseMessage>
{
public IBus Bus { get; set; }
public IIdempotencyManager IdempotencyManager { get; set; }
public void Handle(IBaseMessage message)
{
if (IdempotencyManager.HasMessageBeenProcessed(message))
{
//TODO - Log the fact we are going to drop the message
Bus.DoNotContinueDispatchingCurrentMessageToHandlers();
}
else
{
IdempotencyManager.InsertMessage(message);
}
}
}
public class AzureTableStorageIdempotencyManager : IIdempotencyManager
{
private readonly Lazy<CloudTable> _messageTable;
private readonly List<RecievedMessage> _messagesToInsert = new List<RecievedMessage>();
public AzureTableStorageIdempotencyManager(CloudStorageAccount cloudStorageAccount)
{
_messageTable = new Lazy<CloudTable>(() =>
{
var cloudTableClient = cloudStorageAccount.CreateCloudTableClient();
return cloudTableClient.GetTableReference("RecievedMessages");
});
}
public bool HasMessageBeenProcessed(IBaseMessage message)
{
var retrieveOperation = RecievedMessage.GetRetireveOptertion(message);
var retrieveOperationResult = _messageTable.Value.Execute(retrieveOperation);
var loggedMessageResult = retrieveOperationResult.Result as DynamicTableEntity;
if (loggedMessageResult == null)
return false;
//log the fact we have recieved this message before - show message content including hedaer (ie transport id etc)
return false;
}
public void InsertMessage(IBaseMessage message)
{
_messagesToInsert.Add(new RecievedMessage(message));
}
public void Commit()
{
//Could be done as an Entity Group Transactions (https://msdn.microsoft.com/en-us/library/azure/dd894038.aspx)
// however
// - if there are multiple entires we dont know if they of the same partion
// - and typically there will only be one application level message per tranasport message anyway
foreach (var recievedMessage in _messagesToInsert)
{
var insertOperation = TableOperation.Insert(recievedMessage);
_messageTable.Value.Execute(insertOperation);
}
}
public class RecievedMessage : TableEntity
{
public RecievedMessage(IBaseMessage message)
{
PartitionKey = message.GetType().Name;
RowKey = message.MessageId;
}
public static TableOperation GetRetireveOptertion(IBaseMessage message)
{
return TableOperation.Retrieve(message.GetType().Name, message.MessageId);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment