-
-
Save steelThread/10fd1f26d7561180a761 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Custom AMQPMessageConverter that serialization/deserialization of a full | |
* json message. Useful for passing event messages between axon and non axon | |
* environments. | |
* | |
* @author Sean McDaniel | |
* @since January 25, 2016 | |
*/ | |
public class JsonAmqpMessageConverter implements AMQPMessageConverter { | |
private static Logger log = LoggerFactory.getLogger(JsonAmqpMessageConverter.class); | |
private static final AMQP.BasicProperties DURABLE = new AMQP.BasicProperties.Builder().deliveryMode(2).build(); | |
private ObjectMapper objectMapper; | |
private RoutingKeyResolver routingKeyResolver; | |
private boolean durable; | |
/** | |
* Initializes the AMQPMessageConverter instance using a FieldAccessJavaTimeSupportingObjectMapper, PackageRoutingKeyResolver, | |
* with requesting durable dispatching. | |
*/ | |
public JsonAmqpMessageConverter() { | |
this(new FieldAccessJavaTimeSupportingObjectMapper(), new PackageRoutingKeyResolver(), true); | |
} | |
/** | |
* Build an instance using a default tFieldAccessJavaTimeSupportingObjectMapper, PackageRoutingKeyResolver, | |
* with durability. | |
* | |
* @param objectMapper The ObjectMapper to serialize the Event Message's payload and Meta Data with | |
* @param routingKeyResolver The strategy to use to resolve routing keys for Event Messages | |
* @param durable Whether to request durable message dispatching | |
*/ | |
public JsonAmqpMessageConverter(ObjectMapper objectMapper, | |
RoutingKeyResolver routingKeyResolver, | |
boolean durable) | |
{ | |
this.objectMapper = objectMapper.registerModule(new JodaModule()); | |
this.routingKeyResolver = routingKeyResolver; | |
this.durable = durable; | |
} | |
@Override | |
public AMQPMessage createAMQPMessage(EventMessage eventMessage) { | |
try { | |
byte[] body = bytes(eventMessage); | |
String routingKey = routingKeyResolver.resolveRoutingKey(eventMessage); | |
return durable | |
? new AMQPMessage(body, routingKey, DURABLE, false, false) | |
: new AMQPMessage(body, routingKey); | |
} catch (Exception e) { | |
log.error("Unable to serialize EventMessage.", e); | |
throw new RuntimeException(e); | |
} | |
} | |
@Override | |
public EventMessage readAMQPMessage(byte[] messageBody, Map<String, Object> headers) { | |
try { return new GenericEventMessageDeserializer(messageBody).deserialize(); } | |
catch (Exception e) { | |
log.error("Unable to deserialize EventMessage.", e); | |
throw new RuntimeException(e); | |
} | |
} | |
/** | |
* Serialize the EventMessage to JSON and then to bytes. | |
* | |
* @param eventMessage the EventMessage to serialize | |
* @return bytes | |
*/ | |
private byte[] bytes(EventMessage eventMessage) throws Exception { | |
return objectMapper.writeValueAsBytes(eventMessage); | |
} | |
/////////////////////////////////////////////////////////////////////////////////////////// | |
// Encapsulates deserialization of the EventMessage bytes to a GenericEventMessage instance | |
private class GenericEventMessageDeserializer { | |
private byte[] eventMessage; | |
private GenericEventMessageDeserializer(byte[] eventMessage) { | |
this.eventMessage = eventMessage; | |
} | |
private GenericEventMessage deserialize() throws Exception { | |
EventMessageMeta meta = objectMapper.readValue(eventMessage, EventMessageMeta.class); | |
return new GenericEventMessage( | |
meta.identifier, | |
meta.timestamp, | |
objectMapper.readValue(meta.payload.toString(), resolveClass(meta.payloadType)), | |
meta.metaData | |
); | |
} | |
private Class resolveClass(String name) throws Exception { | |
return Class.forName(name); | |
} | |
} | |
/////////////////////////////////////////////////////////////////////////////////////////// | |
// Simple helper to pull out the message meta data and simplify parsing | |
public static class EventMessageMeta { | |
private String identifier; | |
private Map<String, String> metaData; | |
private String payloadType; | |
private JsonNode payload; | |
private DateTime timestamp; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment