Skip to content

Instantly share code, notes, and snippets.

@randomcodenz
Last active August 29, 2015 14:01
Show Gist options
  • Save randomcodenz/d13f308c2544cff3a856 to your computer and use it in GitHub Desktop.
Save randomcodenz/d13f308c2544cff3a856 to your computer and use it in GitHub Desktop.
// Explicit opt-in message versioning via a marker interface and exchange to exchange bindings to
// route messages from publishing exchange to previous message version exchanges.
// Message serialisation adds all message versions into the message headers and deserialisation
// keeps trying the listed types until it finds one it can create (or runs out and throws an exception)
public interface ISupersede<T> where T : class { }
public class RabbitAdvancedBusWithVersionSupport : RabbitAdvancedBus, IAdvancedBus
{
private const string AlternativeMessageTypesHeaderKey = "Alternative-Message-Types";
private const string AlternativeMessageTypeSeparator = ";";
private readonly ISerializer _serializer;
private readonly Func<string> _getCorrelationId;
private readonly ITypeNameSerializer _typeNameSerializer;
private readonly IHandlerCollectionFactory _handlerCollectionFactory;
public RabbitAdvancedBusWithVersionSupport( IConnectionFactory connectionFactory, ISerializer serializer, IConsumerFactory consumerFactory, IEasyNetQLogger logger, Func<string> getCorrelationId, IClientCommandDispatcherFactory clientCommandDispatcherFactory, IPublisher publisher, IEventBus eventBus, ITypeNameSerializer typeNameSerializer, IHandlerCollectionFactory handlerCollectionFactory, IContainer container ) : base( connectionFactory, serializer, consumerFactory, logger, getCorrelationId, clientCommandDispatcherFactory, publisher, eventBus, typeNameSerializer, handlerCollectionFactory, container )
{
_serializer = serializer;
_getCorrelationId = getCorrelationId;
_typeNameSerializer = typeNameSerializer;
_handlerCollectionFactory = handlerCollectionFactory;
}
public override Task PublishAsync<T>(IExchange exchange, string routingKey, bool mandatory, bool immediate, IMessage<T> message)
{
CheckNotNull(exchange, "exchange");
CheckShortString(routingKey, "routingKey");
CheckNotNull(message, "message");
var typeName = _typeNameSerializer.Serialize(message.Body.GetType());
var messageBody = _serializer.MessageToBytes(message.Body);
message.Properties.Type = typeName;
message.Properties.CorrelationId =
string.IsNullOrEmpty(message.Properties.CorrelationId) ?
_getCorrelationId() :
message.Properties.CorrelationId;
AddAlternativeMessageTypes( message );
return PublishAsync(exchange, routingKey, mandatory, immediate, message.Properties, messageBody);
}
public new IDisposable Consume<T>(IQueue queue, Func<IMessage<T>, MessageReceivedInfo, Task> onMessage, Action<IConsumerConfiguration> configure) where T : class
{
CheckNotNull(queue, "queue");
CheckNotNull(onMessage, "onMessage");
CheckNotNull(configure, "configure");
return Consume(queue, x => x.Add(onMessage), configure);
}
public override IDisposable Consume(IQueue queue, Action<IHandlerRegistration> addHandlers)
{
return Consume(queue, addHandlers, x => { });
}
public new IDisposable Consume(IQueue queue, Action<IHandlerRegistration> addHandlers, Action<IConsumerConfiguration> configure)
{
CheckNotNull(queue, "queue");
CheckNotNull(addHandlers, "addHandlers");
CheckNotNull(configure, "configure");
var handlerCollection = _handlerCollectionFactory.CreateHandlerCollection();
addHandlers(handlerCollection);
return Consume( queue, ( body, properties, messageReceivedInfo ) =>
{
var messageType = GetMessageType( properties );
var messageTypeString = _typeNameSerializer.Serialize( messageType );
var handler = handlerCollection.GetHandler( messageType );
var messageBody = _serializer.BytesToMessage( messageTypeString, body );
var message = Message.CreateInstance( messageType, messageBody );
message.SetProperties( properties );
return handler( message, messageReceivedInfo );
}, configure );
}
private void AddAlternativeMessageTypes<T>( IMessage<T> message )
{
var alternativeMessageTypes = new List<string>();
var messageType = typeof( T );
var supersededType = GetSupersededMessageType( messageType );
while( supersededType != null )
{
var alternativeTypeName = _typeNameSerializer.Serialize( supersededType );
alternativeMessageTypes.Add( alternativeTypeName );
supersededType = GetSupersededMessageType( supersededType );
}
var alternativeTypes = string.Join( AlternativeMessageTypeSeparator, alternativeMessageTypes );
message.Properties.Headers.Add( AlternativeMessageTypesHeaderKey, alternativeTypes );
}
private static Type GetSupersededMessageType(Type messageType)
{
return messageType
.GetInterfaces()
.Where(t => t.IsGenericType && t.GetGenericTypeDefinition() == typeof(ISupersede<>))
.SelectMany(t => t.GetGenericArguments())
.FirstOrDefault();
}
private Type GetMessageType( MessageProperties messageProperties )
{
Type messageType;
if( TryGetType( messageProperties.Type, out messageType ) )
return messageType;
if( !messageProperties.HeadersPresent || !messageProperties.Headers.ContainsKey( AlternativeMessageTypesHeaderKey ) )
throw new EasyNetQException( "Cannot find type {0}", messageProperties.Type );
var alternativeTypeHeaderRaw = messageProperties.Headers[ AlternativeMessageTypesHeaderKey ] as byte[];
if( alternativeTypeHeaderRaw == null )
throw new EasyNetQException( "Unable to extract raw {0} header as byte[]", AlternativeMessageTypesHeaderKey );
var alternativeTypeHeader = Encoding.UTF8.GetString( alternativeTypeHeaderRaw );
if( alternativeTypeHeader == null )
throw new EasyNetQException( "Unable to convert raw {0} header to a string", AlternativeMessageTypesHeaderKey );
var alternativeTypes = alternativeTypeHeader.Split( new[] {AlternativeMessageTypeSeparator}, StringSplitOptions.RemoveEmptyEntries );
foreach( var alternativeType in alternativeTypes )
{
if( TryGetType( alternativeType, out messageType ) )
return messageType;
}
throw new EasyNetQException( "Cannot find declared message type {0} or any of the specified alternative types {1}", messageProperties.Type, alternativeTypeHeader );
}
private bool TryGetType( string typeString, out Type messageType )
{
try
{
messageType = _typeNameSerializer.DeSerialize( typeString );
return true;
}
catch
{
messageType = null;
return false;
}
}
private static void CheckNotNull<T>(T value, string name) where T : class
{
CheckNotNull(value, name, string.Format("{0} must not be null", name));
}
private static void CheckShortString(string value, string name)
{
CheckNotNull(value, name);
if (value.Length > 255)
{
throw new ArgumentException(string.Format("Argument '{0}' must be less than or equal to 255 characters.", name));
}
}
private static void CheckNotNull<T>(T value, string name, string message) where T : class
{
CheckNotBlank(name, "name", "name must not be blank");
CheckNotBlank(message, "message", "message must not be blank");
if (value == null)
{
throw new ArgumentNullException(name, message);
}
}
private static void CheckNotBlank(string value, string name, string message)
{
if (string.IsNullOrWhiteSpace(name))
{
throw new ArgumentException("name must not be blank", "name");
}
if (string.IsNullOrWhiteSpace(message))
{
throw new ArgumentException("message must not be blank", "message");
}
if (string.IsNullOrWhiteSpace(value))
{
throw new ArgumentException(message, name);
}
}
}
public class VersionedPublishExchangeDeclareStrategy : IPublishExchangeDeclareStrategy
{
private readonly ConcurrentDictionary<string, IExchange> exchangeNames =
new ConcurrentDictionary<string, IExchange>();
public IExchange DeclareExchange(IAdvancedBus advancedBus, string exchangeName, string exchangeType)
{
return exchangeNames.AddOrUpdate(
exchangeName,
name => advancedBus.ExchangeDeclare(name, exchangeType),
(_, exchange) => exchange);
}
public IExchange DeclareExchange(IAdvancedBus advancedBus, Type messageType, string exchangeType)
{
var conventions = advancedBus.Container.Resolve<IConventions>();
var exchangeName = conventions.ExchangeNamingConvention(messageType);
var publishExchange = DeclareExchange(advancedBus, exchangeName, exchangeType);
ConfigureMessageVersioning( publishExchange, advancedBus, messageType, exchangeType );
return publishExchange;
}
private void ConfigureMessageVersioning( IExchange source, IAdvancedBus advancedBus, Type messageType, string exchangeType )
{
var supersededMessageType = GetSupersededMessageType(messageType);
if( supersededMessageType == null )
return;
var supersededExchange = DeclareExchange(advancedBus, supersededMessageType, exchangeType);
advancedBus.Bind( source, supersededExchange, "#" );
}
private static Type GetSupersededMessageType( Type messageType )
{
return messageType
.GetInterfaces()
.Where( t => t.IsGenericType && t.GetGenericTypeDefinition() == typeof( ISupersede<> ) )
.SelectMany( t => t.GetGenericArguments() )
.FirstOrDefault();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment