Skip to content

Instantly share code, notes, and snippets.

@ndemengel
Last active July 18, 2017 05:48
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ndemengel/0ee3944032cd32ce0594659dc3fe53ea to your computer and use it in GitHub Desktop.
Save ndemengel/0ee3944032cd32ce0594659dc3fe53ea to your computer and use it in GitHub Desktop.
RabbitMQ/Spring migration - Result
import lombok.Value;
// This code only works if compiling the code using javac's -parameters option (Java 8 only).
// With this option, Jackons can use the all-args constructor of our type.
// See https://gist.github.com/ndemengel/72f362bc31afe5fcaa0499af8f269651 for a more general solution.
@Value
public class EmailUpdated implements Event {
public static final String EXCHANGE_NAME = "email.updated.exchange";
String accountId;
String oldEmail;
String newEmail;
}
@RequestMapping(path = "/account/email", method = POST)
public ModelAndView updateEmail(@Valid NewEmailForm form) {
// ...
// call some account-handling service which ultimately does the following:
eventsService.fire(new EmailUpdated(accountId, oldEmail(), newEmail));
// ...
}
@RabbitEventListener(queue = "email.updated.mailchimp.queue")
public void emailUpdated(EmailUpdated event) {
updateMailChimpMember(event.getOldEmail(), event.getNewEmail());
}
import static org.apache.commons.lang3.StringUtils.isBlank;
public interface Event {
String EXCHANGE_FIELD_NAME = "EXCHANGE_NAME";
public static String getExchangeName(Event event) {
return getExchangeName(event.getClass());
}
public static String getExchangeName(Class<? extends Event> eventClass) {
try {
Object o = eventClass.getDeclaredField(EXCHANGE_FIELD_NAME).get(null);
if (!(o instanceof String)) {
throw new IllegalStateException(String.format("Field %s of class %s is not a String", EXCHANGE_FIELD_NAME, eventClass.getSimpleName()));
}
if (isBlank((String) o)) {
throw new IllegalStateException(String.format("Field %s of class %s has blank value", EXCHANGE_FIELD_NAME, eventClass.getSimpleName()));
}
return (String) o;
} catch (NoSuchFieldException e) {
throw new IllegalStateException(String.format("Can't find field %s on class %s", EXCHANGE_FIELD_NAME, eventClass.getSimpleName()));
} catch (IllegalAccessException e) {
throw new IllegalStateException(String.format("Can't acess field %s on class %s", EXCHANGE_FIELD_NAME, eventClass.getSimpleName()));
}
}
}
public interface EventsService {
<EventType extends Event> EventType fire(EventType event);
}
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import javax.inject.Inject;
@Service
public class DefaultEventsService implements EventsService {
@Inject
private RabbitTemplate rabbitTemplate;
@Override
public <EventType extends Event> EventType fire(EventType event) {
String exchangeName = Event.getExchangeName(event);
String noRoutingKey = "";
rabbitTemplate.convertAndSend(exchangeName, noRoutingKey, event);
return event;
}
}
@Configuration
public class RabbitConfig {
public static final String DEAD_LETTER_EXCHANGE = "dead.letter.exchange";
public static final String RETRY_ROUTING_KEY = "retry";
private static final String BACK_FROM_THE_DEAD_EXCHANGE = "back.from.the.dead.exchange";
private static final String DELAY_LETTER_QUEUE = "dead.letter.queue";
private static final String RETRY_DELAY_HEADER = "retry-delay";
private static final String RETRY_QUEUE = "retry.queue";
private static final String TIMEOUT_ROUTING_KEY = "timeout";
private static final String TO_DELIVER_HEADER = "toDeliver";
@Inject
private ConfigurableListableBeanFactory beanRegistry;
@Value("${rabbit.listeners.number.consumers:5}")
private int nbOfConsumers;
@Value("${rabbit.listeners.max.number.consumers:10}")
private int maxNumberOfConsumers;
@PostConstruct
public void createExchangesFromEventsInClasspath() {
ClassPathScanningCandidateComponentProvider scanner = new ClassPathScanningCandidateComponentProvider(false);
scanner.addIncludeFilter(new AssignableTypeFilter(Event.class));
scanner.findCandidateComponents("com.hopwork").forEach(c -> {
try {
Class<? extends Event> eventClass = (Class<? extends Event>) Class.forName(c.getBeanClassName());
String exchangeName = Event.getExchangeName(eventClass);
RabbitEventListenerPostProcessor.ensureExchangeExists(exchangeName, beanRegistry);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
});
}
@Override
@Primary
@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() {
return new RabbitEventListenerPostProcessor(beanRegistry, backFromTheDeadExchange());
}
@Primary
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(RabbitTemplate rabbitTemplate, ConnectionFactory connectionFactory) throws NoSuchAlgorithmException, KeyManagementException {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConcurrentConsumers(nbOfConsumers);
factory.setMaxConcurrentConsumers(maxNumberOfConsumers);
factory.setConnectionFactory(connectionFactory);
StatefulRetryOperationsInterceptorFactoryBean fb = new StatefulRetryOperationsInterceptorFactoryBean();
RetryContextCache cache = new MapRetryContextCache();
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryContextCache(cache);
fb.setRetryOperations(retryTemplate);
fb.setMessageRecoverer(new RepublishMessageRecoverer(rabbitTemplate, DEAD_LETTER_EXCHANGE, RETRY_ROUTING_KEY));
MissingMessageIdAdvice missingIdAdvice = new MissingMessageIdAdvice(cache);
Advice retryInterceptor = fb.getObject();
factory.setAdviceChain(missingIdAdvice, retryInterceptor);
return factory;
}
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE, true, false);
}
@Bean
public DirectExchange backFromTheDeadExchange() {
return new DirectExchange(BACK_FROM_THE_DEAD_EXCHANGE);
}
@Bean
public Queue deadLetterQueue() {
return queueWithDelayedRetry(DELAY_LETTER_QUEUE);
}
@Bean
public Queue retryQueue() {
return queueWithDelayedRetry(RETRY_QUEUE);
}
@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(TIMEOUT_ROUTING_KEY);
}
@Bean
public Binding retryQueueBinding() {
return BindingBuilder.bind(retryQueue()).to(deadLetterExchange()).with(RETRY_ROUTING_KEY);
}
@Bean
public SimpleMessageListenerContainer deadLetterListener(ConnectionFactory connectionFactory, RabbitTemplate rabbitTemplate) throws GeneralSecurityException, IOException {
if (busEnabled) {
return buildRawListenerContainer(connectionFactory, RETRY_QUEUE, new MessageListener() {
@Override
public void onMessage(Message message) {
Map<String, Object> headers = message.getMessageProperties().getHeaders();
if (headers.containsKey(TO_DELIVER_HEADER)) {
rabbitTemplate.send(backFromTheDeadExchange().getName(), findLastRejectedQueue(headers),
MessageBuilder.fromMessage(message).removeHeader(TO_DELIVER_HEADER).build());
} else {
Integer expiration = computeNextDelay(Optional.ofNullable((Integer) headers.get(RETRY_DELAY_HEADER)));
rabbitTemplate.send(DEAD_LETTER_EXCHANGE, TIMEOUT_ROUTING_KEY,
MessageBuilder.fromMessage(message)
.setHeader(TO_DELIVER_HEADER, true)
.setHeader(RETRY_DELAY_HEADER, expiration)
.setExpiration(String.valueOf(expiration)).build());
}
}
@SuppressWarnings("unchecked")
private String findLastRejectedQueue(Map<String, Object> headers) {
return (String) FluentIterable
.from((Iterable<Map<String, Object>>) headers.get("x-death"))
.firstMatch(x -> "rejected".equals(x.get("reason")))
.transform(x -> x.get("queue"))
.get();
}
});
} else return null;
}
private static int computeNextDelay(Optional<Integer> previous) {
int defaultDelayInMs = 2000;
int maxDelayInMs = 60000;
return previous.map(x -> Math.min(x * 2, maxDelayInMs)).orElse(defaultDelayInMs);
}
}
public class RabbitEventListenerPostProcessor extends RabbitListenerAnnotationBeanPostProcessor {
private static final String DEAD_LETTER_BINDING_BEAN_SUFFIX = "DeadLetterBinding";
private static final String EXCHANGE_BINDING_BEAN_SUFFIX = "ExchangeBinding";
private static final String QUEUE_BEAN_SUFFIX = "Queue";
private final DirectExchange backFromTheDeadExchange;
private final ConfigurableBeanFactory beanRegistry;
public RabbitEventListenerPostProcessor(ConfigurableBeanFactory beanRegistry, DirectExchange backFromTheDeadExchange) {
this.beanRegistry = beanRegistry;
this.backFromTheDeadExchange = backFromTheDeadExchange;
}
@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
Class<?> targetClass = AopUtils.getTargetClass(bean);
ReflectionUtils.doWithMethods(targetClass, method ->
initRabbitListener(bean, beanName, method), ReflectionUtils.USER_DECLARED_METHODS);
return super.postProcessAfterInitialization(bean, beanName);
}
private void initRabbitListener(Object bean, String beanName, Method method) {
RabbitEventListener bindingDescriptor = AnnotationUtils.findAnnotation(method, RabbitEventListener.class);
if (bindingDescriptor != null) {
checkState(isNotBlank(bindingDescriptor.queue()), "No queue name defined for listener " + method.getName());
Class<? extends Event> eventClass = getEventClass(method);
String exchangeName = Event.getExchangeName(eventClass);
FanoutExchange exchange = ensureExchangeExists(exchangeName);
createAndBindQueueToExchangeAndDeadLetter(exchange, bindingDescriptor.queue());
processAmqpListener(toSpringRabbitListenerAnnotation(bindingDescriptor), method, bean, beanName);
}
}
private Class<? extends Event> getEventClass(Method method) {
return stream(method.getParameterTypes())
.filter(Event.class::isAssignableFrom)
.map(cls -> (Class<Event>) cls)
.findFirst()
.orElseThrow(() -> new IllegalStateException(String.format(
"Method %s is annotated with %s but has no argument of type %s",
method.getName(), RabbitEventListener.class.getSimpleName(), Event.class.getSimpleName())));
}
private RabbitListener toSpringRabbitListenerAnnotation(final RabbitEventListener bindingDescriptor) {
// an annotation is just an interface, let's create an anonymous implementation!
return new RabbitListener() {
@Override
public Class<? extends Annotation> annotationType() {
return RabbitListener.class;
}
@Override
public String id() {
return "";
}
@Override
public String containerFactory() {
return bindingDescriptor.containerFactory();
}
@Override
public String[] queues() {
return new String[]{bindingDescriptor.queue()};
}
@Override
public boolean exclusive() {
return false;
}
@Override
public String priority() {
return "";
}
@Override
public String admin() {
return "";
}
@Override
public QueueBinding[] bindings() {
return new QueueBinding[0];
}
@Override
public String group() {
return "";
}
};
}
private FanoutExchange ensureExchangeExists(String exchangeName) {
return ensureExchangeExists(exchangeName, beanRegistry);
}
static FanoutExchange ensureExchangeExists(String exchangeName, ConfigurableBeanFactory beanRegistry) {
String exchangeBeanName = beanNameForExchange(exchangeName);
if (beanRegistry.containsBean(exchangeBeanName)) {
return beanRegistry.getBean(exchangeBeanName, FanoutExchange.class);
}
FanoutExchange exchange = new FanoutExchange(exchangeName, true, false);
beanRegistry.registerSingleton(exchangeBeanName, exchange);
return exchange;
}
private void createAndBindQueueToExchangeAndDeadLetter(FanoutExchange exchange, String queueName) {
String beanPrefix = beanPrefixForQueue(queueName);
Queue queue = queueWithDelayedRetry(queueName);
beanRegistry.registerSingleton(beanPrefix + QUEUE_BEAN_SUFFIX, queue);
bind(queueName, beanPrefix, queue, exchange);
}
private static String beanNameForExchange(String exchangeName) {
// "generated__" prefix prevents collision with beans created by developers
return "generated__" +
StringUtils.uncapitalize(stream(exchangeName.split("\\."))
.filter(n -> !n.isEmpty())
.map(StringUtils::capitalize)
.collect(joining("")));
}
private String beanPrefixForQueue(String queue) {
return queue.replace(".", "").replace("queue", "");
}
private Queue queueWithDelayedRetry(String queueName) {
return new Queue(queueName, true, false, false,
ImmutableMap.of(
"x-dead-letter-exchange", RabbitConfig.DEAD_LETTER_EXCHANGE,
"x-dead-letter-routing-key", RabbitConfig.RETRY_ROUTING_KEY));
}
private void bind(String queueName, String beanPrefix, Queue queue, FanoutExchange exchange) {
Binding bindingToExchange = BindingBuilder.bind(queue).to(exchange);
Binding bindingToDeadLetter = BindingBuilder.bind(queue).to(backFromTheDeadExchange)
.with(queueName);
beanRegistry.registerSingleton(beanPrefix + exchange.getName() + EXCHANGE_BINDING_BEAN_SUFFIX, bindingToExchange);
beanRegistry.registerSingleton(beanPrefix + exchange.getName() + DEAD_LETTER_BINDING_BEAN_SUFFIX, bindingToDeadLetter);
}
}
// stub implementation, useful for simple tests
public class NullEventsService implements EventsService {
@Override
public <EventType extends Event> EventType fire(EventType event) {
return event;
}
}
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.hopwork.config.RabbitEventListener;
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.util.ReflectionUtils;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
/**
* Simple implementation that directly calls the listeners when an event is fired.
* Useful for testing publishers and consumers in integration, in a synchronous way
* (within the same app only).
*/
public class JvmSynchronizedEventsService implements BeanPostProcessor, EventsService {
private Multimap<Class<? extends Event>, Pair<Object, Method>> listenerRegistry = HashMultimap.create();
public JvmSynchronizedEventsService(Object... listeners) {
for (Object listener : listeners) {
maybeRegisterListener(listener);
}
}
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
maybeRegisterListener(bean);
return bean;
}
@Override
public <EventType extends Event> EventType fire(EventType event) {
listenerRegistry.get(event.getClass()).forEach(beanAndMethod -> {
try {
beanAndMethod.getRight().invoke(beanAndMethod.getLeft(), event);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
});
return event;
}
private void maybeRegisterListener(Object listener) {
Class<?> targetClass = AopUtils.getTargetClass(listener);
ReflectionUtils.doWithMethods(targetClass, method -> {
RabbitEventListener bindingDescriptor = AnnotationUtils.findAnnotation(method, RabbitEventListener.class);
if (bindingDescriptor != null) {
Class<? extends Event> eventClass = (Class<? extends Event>) method.getParameterTypes()[0];
listenerRegistry.put(eventClass, Pair.of(listener, method));
}
}, ReflectionUtils.USER_DECLARED_METHODS);
}
}
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = {
SomePublisher.class,
JvmSynchronizedEventsService.class,
SomeConsumer.class
})
public class SomeTest {
@Inject
private SomePublisher somePublisher;
@Test
public void should_observe_event_publication_and_consumption() throws Exception {
// when
somePublisher.doSomethingThatFiresAnEvent();
// then
assertThatEventHasBeenConsumedInTheExpectedWay();
}
}
import org.junit.Test;
import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider;
import org.springframework.core.type.filter.AssignableTypeFilter;
import java.lang.annotation.Annotation;
import java.lang.reflect.*;
import java.util.*;
import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertTrue;
/**
* Enforce {@link Event}s' invariants.
*/
public class EventDefinitionsTest {
/**
* Ensure that the convention is respected and the documentation can be generated.
*/
@Test
public void event_classes_should_have_a_public_static_final_EXCHANGE_NAME_field_which_content_has_the_right_suffix() throws Exception {
ClassPathScanningCandidateComponentProvider scanner = new ClassPathScanningCandidateComponentProvider(false);
scanner.addIncludeFilter(new AssignableTypeFilter(Event.class));
scanner.findCandidateComponents("com.hopwork").forEach(c -> {
try {
Class<?> eventClass = Class.forName(c.getBeanClassName());
Field exchangeNameField = eventClass.getField("EXCHANGE_NAME");
assertTrue(c.getBeanClassName() + ".EXCHANGE_NAME is not public", Modifier.isPublic(exchangeNameField.getModifiers()));
assertTrue(c.getBeanClassName() + ".EXCHANGE_NAME is not static", Modifier.isStatic(exchangeNameField.getModifiers()));
assertTrue(c.getBeanClassName() + ".EXCHANGE_NAME is not final", Modifier.isFinal(exchangeNameField.getModifiers()));
String exchangeName = (String) exchangeNameField.get(null);
assertThat(exchangeName).isNotNull().endsWith(".exchange");
} catch (ClassNotFoundException | IllegalAccessException | NoSuchFieldException e) {
throw new RuntimeException("Error testing class: " + c.getBeanClassName(), e);
}
});
}
/**
* Prevent rules to be violated by subclassing an event class.
* {@link Event}s conceptually are Value Classes, anyway.
*/
@Test
public void event_classes_should_be_final() throws Exception {
ClassPathScanningCandidateComponentProvider scanner = new ClassPathScanningCandidateComponentProvider(false);
scanner.addIncludeFilter(new AssignableTypeFilter(Event.class));
scanner.findCandidateComponents("com.hopwork").forEach(c -> {
try {
Class<?> eventClass = Class.forName(c.getBeanClassName());
assertTrue(c.getBeanClassName() + " is not final", Modifier.isFinal(eventClass.getModifiers()));
} catch (ClassNotFoundException e) {
throw new RuntimeException("Error testing class: " + c.getBeanClassName(), e);
}
});
}
/**
* Inter-app communication should not be broken because of common refactoring of business logic.
* Therefore {@link Event}s should only depend on classes of the standard library or on utility classes
* created specifically for them.
* <p>Note: this test do its best to prevent inclusion of domain classes, but can't enforce it.</p>
*/
@Test
public void event_classes_should_not_depend_on_domain_classes() throws Exception {
Set<String> forbiddenPackages = new HashSet<>();
forbiddenPackages.add("com.hopwork.xxx.domain1");
forbiddenPackages.add("com.hopwork.xxx.domain2");
// ...
ClassPathScanningCandidateComponentProvider scanner = new ClassPathScanningCandidateComponentProvider(false);
scanner.addIncludeFilter(new AssignableTypeFilter(Event.class));
scanner.findCandidateComponents("com.hopwork").forEach(c -> {
try {
Class<?> eventClass = Class.forName(c.getBeanClassName());
if (eventClass.getAnnotation(Deprecated.class) != null) {
return;
}
Set<String> usedPackages = new HashSet<>();
for (Field field : eventClass.getDeclaredFields()) {
if (field.getType().getPackage() != null) {
usedPackages.add(field.getType().getPackage().getName());
}
for (Annotation a : field.getAnnotations()) {
if (a.annotationType().getPackage() != null) {
usedPackages.add(a.annotationType().getPackage().getName());
}
}
}
for (Constructor<?> constructor : eventClass.getDeclaredConstructors()) {
for (Annotation a : constructor.getAnnotations()) {
if (a.annotationType().getPackage() != null) {
usedPackages.add(a.annotationType().getPackage().getName());
}
}
for (Class<?> t : constructor.getParameterTypes()) {
if (t.getPackage() != null) {
usedPackages.add(t.getPackage().getName());
}
for (Annotation a : t.getAnnotations()) {
if (a.annotationType().getPackage() != null) {
usedPackages.add(a.annotationType().getPackage().getName());
}
}
}
}
for (Method method : eventClass.getDeclaredMethods()) {
for (Annotation a : method.getAnnotations()) {
if (a.annotationType().getPackage() != null) {
usedPackages.add(a.annotationType().getPackage().getName());
}
}
for (Class<?> t : method.getParameterTypes()) {
if (t.getPackage() != null) {
usedPackages.add(t.getPackage().getName());
}
for (Annotation a : t.getAnnotations()) {
if (a.annotationType().getPackage() != null) {
usedPackages.add(a.annotationType().getPackage().getName());
}
}
}
if (method.getReturnType() != null) {
if (method.getReturnType().getPackage() != null) {
usedPackages.add(method.getReturnType().getPackage().getName());
}
for (Annotation a : method.getReturnType().getAnnotations()) {
if (a.annotationType().getPackage() != null) {
usedPackages.add(a.annotationType().getPackage().getName());
}
}
}
}
usedPackages.forEach(p ->
forbiddenPackages.forEach(fp ->
assertThat(p).as(c.getBeanClassName() + " used classes").doesNotStartWith(fp)));
} catch (ClassNotFoundException e) {
throw new RuntimeException("Error testing class: " + c.getBeanClassName(), e);
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment