Skip to content

Instantly share code, notes, and snippets.

@arielmoraes
Created July 27, 2018 15:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save arielmoraes/5c44170d53c8a5c54543d52027d76759 to your computer and use it in GitHub Desktop.
Save arielmoraes/5c44170d53c8a5c54543d52027d76759 to your computer and use it in GitHub Desktop.
Rebus Custom Json Serializer for supporting arbitrary names to type mappings
using Newtonsoft.Json;
using Rebus.Messages;
using Rebus.Serialization;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Rebus.Extensions;
using System.Linq;
using System.Collections.Concurrent;
namespace RabbitRebusTests.Consumer
{
class CustomTypeNameJsonSerializer : ISerializer
{
/// <summary>
/// Proper content type when a message has been serialized with this serializer (or another compatible JSON serializer) and it uses the standard UTF8 encoding
/// </summary>
public const string JsonUtf8ContentType = "application/json;charset=utf-8";
/// <summary>
/// Contents type when the content is JSON
/// </summary>
public const string JsonContentType = "application/json";
static readonly JsonSerializerSettings DefaultSettings = new JsonSerializerSettings
{
TypeNameHandling = TypeNameHandling.None
};
static readonly Encoding DefaultEncoding = Encoding.UTF8;
readonly JsonSerializerSettings _settings;
readonly Encoding _encoding;
readonly string _encodingHeaderValue;
readonly string _customTypeNameHeader;
public CustomTypeNameJsonSerializer(string customTypeNameHeader, Dictionary<string, Type> typesNamesMappings)
: this(DefaultSettings, DefaultEncoding, customTypeNameHeader, typesNamesMappings)
{
}
internal CustomTypeNameJsonSerializer(Encoding encoding, string customTypeNameHeader, Dictionary<string, Type> typesNamesMappings)
: this(DefaultSettings, encoding, customTypeNameHeader, typesNamesMappings)
{
}
internal CustomTypeNameJsonSerializer(JsonSerializerSettings jsonSerializerSettings, string customTypeNameHeader, Dictionary<string, Type> typesNamesMappings)
: this(jsonSerializerSettings, DefaultEncoding, customTypeNameHeader, typesNamesMappings)
{
}
internal CustomTypeNameJsonSerializer(JsonSerializerSettings jsonSerializerSettings, Encoding encoding, string customTypeNameHeader, Dictionary<string, Type> typesNamesMappings)
{
_settings = jsonSerializerSettings;
_encoding = encoding;
_customTypeNameHeader = customTypeNameHeader;
foreach (var kv in typesNamesMappings)
{
TypeCache[kv.Key] = kv.Value;
}
_encodingHeaderValue = $"{JsonContentType};charset={_encoding.WebName}";
}
#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously
/// <summary>
/// Serializes the given <see cref="Message"/> into a <see cref="TransportMessage"/>
/// </summary>
public async Task<TransportMessage> Serialize(Message message)
#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously
{
var jsonText = JsonConvert.SerializeObject(message.Body, _settings);
var bytes = _encoding.GetBytes(jsonText);
var headers = message.Headers.Clone();
headers[Headers.ContentType] = _encodingHeaderValue;
if (!headers.ContainsKey(Headers.Type))
{
headers[Headers.Type] = message.Body.GetType().GetSimpleAssemblyQualifiedName();
}
return new TransportMessage(headers, bytes);
}
#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously
/// <summary>
/// Deserializes the given <see cref="TransportMessage"/> back into a <see cref="Message"/>
/// </summary>
public async Task<Message> Deserialize(TransportMessage transportMessage)
#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously
{
var contentType = transportMessage.Headers.GetValue(Headers.ContentType);
if (contentType.Equals(JsonUtf8ContentType, StringComparison.OrdinalIgnoreCase))
{
return GetMessage(transportMessage, _encoding);
}
if (contentType.StartsWith(JsonContentType))
{
var encoding = GetEncoding(contentType);
return GetMessage(transportMessage, encoding);
}
throw new FormatException($"Unknown content type: '{contentType}' - must be '{JsonContentType}' (e.g. '{JsonUtf8ContentType}') for the JSON serialier to work");
}
Encoding GetEncoding(string contentType)
{
var parts = contentType.Split(';');
var charset = parts
.Select(token => token.Split('='))
.Where(tokens => tokens.Length == 2)
.FirstOrDefault(tokens => tokens[0] == "charset");
if (charset == null)
{
return _encoding;
}
var encodingName = charset[1];
try
{
return Encoding.GetEncoding(encodingName);
}
catch (Exception exception)
{
throw new FormatException($"Could not turn charset '{encodingName}' into proper encoding!", exception);
}
}
Message GetMessage(TransportMessage transportMessage, Encoding bodyEncoding)
{
var bodyString = bodyEncoding.GetString(transportMessage.Body);
var type = GetTypeOrNull(transportMessage);
var bodyObject = Deserialize(bodyString, type);
var headers = transportMessage.Headers.Clone();
return new Message(headers, bodyObject);
}
static readonly ConcurrentDictionary<string, Type> TypeCache = new ConcurrentDictionary<string, Type>();
Type GetTypeOrNull(TransportMessage transportMessage)
{
Type foundType;
if (transportMessage.Headers.TryGetValue(Headers.Type, out var typeName))
{
try
{
foundType = TypeCache.GetOrAdd(typeName, Type.GetType);
if (foundType != null) return foundType;
}
catch (Exception exception)
{
throw new FormatException($"Could not get .NET type named '{typeName}'", exception);
}
}
if (transportMessage.Headers.TryGetValue(_customTypeNameHeader, out var customTypeName))
{
foundType = TypeCache.GetValueOrDefault(customTypeName);
if (foundType != null) return foundType;
}
return null;
}
object Deserialize(string bodyString, Type type)
{
try
{
return type == null
? JsonConvert.DeserializeObject(bodyString, _settings)
: JsonConvert.DeserializeObject(bodyString, type, _settings);
}
catch (Exception exception)
{
if (bodyString.Length > 32768)
{
throw new FormatException($"Could not deserialize JSON text (original length: {bodyString.Length}): '{Limit(bodyString, 5000)}'", exception);
}
throw new FormatException($"Could not deserialize JSON text: '{bodyString}'", exception);
}
}
static string Limit(string bodyString, int maxLength)
{
return string.Concat(bodyString.Substring(0, maxLength), " (......)");
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment