Skip to content

Instantly share code, notes, and snippets.

@steelThread
Created January 25, 2016 20:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save steelThread/10fd1f26d7561180a761 to your computer and use it in GitHub Desktop.
Save steelThread/10fd1f26d7561180a761 to your computer and use it in GitHub Desktop.
/**
* Custom AMQPMessageConverter that serialization/deserialization of a full
* json message. Useful for passing event messages between axon and non axon
* environments.
*
* @author Sean McDaniel
* @since January 25, 2016
*/
public class JsonAmqpMessageConverter implements AMQPMessageConverter {
private static Logger log = LoggerFactory.getLogger(JsonAmqpMessageConverter.class);
private static final AMQP.BasicProperties DURABLE = new AMQP.BasicProperties.Builder().deliveryMode(2).build();
private ObjectMapper objectMapper;
private RoutingKeyResolver routingKeyResolver;
private boolean durable;
/**
* Initializes the AMQPMessageConverter instance using a FieldAccessJavaTimeSupportingObjectMapper, PackageRoutingKeyResolver,
* with requesting durable dispatching.
*/
public JsonAmqpMessageConverter() {
this(new FieldAccessJavaTimeSupportingObjectMapper(), new PackageRoutingKeyResolver(), true);
}
/**
* Build an instance using a default tFieldAccessJavaTimeSupportingObjectMapper, PackageRoutingKeyResolver,
* with durability.
*
* @param objectMapper The ObjectMapper to serialize the Event Message's payload and Meta Data with
* @param routingKeyResolver The strategy to use to resolve routing keys for Event Messages
* @param durable Whether to request durable message dispatching
*/
public JsonAmqpMessageConverter(ObjectMapper objectMapper,
RoutingKeyResolver routingKeyResolver,
boolean durable)
{
this.objectMapper = objectMapper.registerModule(new JodaModule());
this.routingKeyResolver = routingKeyResolver;
this.durable = durable;
}
@Override
public AMQPMessage createAMQPMessage(EventMessage eventMessage) {
try {
byte[] body = bytes(eventMessage);
String routingKey = routingKeyResolver.resolveRoutingKey(eventMessage);
return durable
? new AMQPMessage(body, routingKey, DURABLE, false, false)
: new AMQPMessage(body, routingKey);
} catch (Exception e) {
log.error("Unable to serialize EventMessage.", e);
throw new RuntimeException(e);
}
}
@Override
public EventMessage readAMQPMessage(byte[] messageBody, Map<String, Object> headers) {
try { return new GenericEventMessageDeserializer(messageBody).deserialize(); }
catch (Exception e) {
log.error("Unable to deserialize EventMessage.", e);
throw new RuntimeException(e);
}
}
/**
* Serialize the EventMessage to JSON and then to bytes.
*
* @param eventMessage the EventMessage to serialize
* @return bytes
*/
private byte[] bytes(EventMessage eventMessage) throws Exception {
return objectMapper.writeValueAsBytes(eventMessage);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Encapsulates deserialization of the EventMessage bytes to a GenericEventMessage instance
private class GenericEventMessageDeserializer {
private byte[] eventMessage;
private GenericEventMessageDeserializer(byte[] eventMessage) {
this.eventMessage = eventMessage;
}
private GenericEventMessage deserialize() throws Exception {
EventMessageMeta meta = objectMapper.readValue(eventMessage, EventMessageMeta.class);
return new GenericEventMessage(
meta.identifier,
meta.timestamp,
objectMapper.readValue(meta.payload.toString(), resolveClass(meta.payloadType)),
meta.metaData
);
}
private Class resolveClass(String name) throws Exception {
return Class.forName(name);
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// Simple helper to pull out the message meta data and simplify parsing
public static class EventMessageMeta {
private String identifier;
private Map<String, String> metaData;
private String payloadType;
private JsonNode payload;
private DateTime timestamp;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment