Skip to content

Instantly share code, notes, and snippets.

@ndemengel
Last active April 30, 2017 18:59
Show Gist options
  • Save ndemengel/1e7ee49278d91d62b6994b94f7c9866c to your computer and use it in GitHub Desktop.
Save ndemengel/1e7ee49278d91d62b6994b94f7c9866c to your computer and use it in GitHub Desktop.
RabbitMQ/Spring migration - Step 2: let's craft our own annotation
// let's use our own annotation!
@RabbitEventListener(queue = "email.updated.mailchimp.queue", exchange = RabbitConfig.EMAIL_UPDATED_EXCHANGE)
public void emailUpdated(EmailUpdated event) {
updateMailChimpMember(event.getOldEmail(), event.getNewEmail());
}
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface RabbitEventListener {
String exchange();
String queue();
}
@PostConstruct
public void createExchanges() {
// We don't create the queues anymore here! We don't care about the bindings either :-)
createExchangeBean(EMAIL_UPDATED_EXCHANGE);
// ... other exchanges...
}
@Override
@Primary
@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() {
return new RabbitEventListenerPostProcessor(beanRegistry, backFromTheDeadExchange());
}
public class RabbitEventListenerPostProcessor extends RabbitListenerAnnotationBeanPostProcessor {
@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
Class<?> targetClass = AopUtils.getTargetClass(bean);
ReflectionUtils.doWithMethods(targetClass, method ->
initRabbitListener(bean, beanName, method), ReflectionUtils.USER_DECLARED_METHODS);
return super.postProcessAfterInitialization(bean, beanName);
}
private void initRabbitListener(Object bean, String beanName, Method method) {
RabbitEventListener bindingDescriptor = AnnotationUtils.findAnnotation(method, RabbitEventListener.class);
if (bindingDescriptor != null) {
FanoutExchange exchange = ensureExchangeExists(bindingDescriptor.exchange());
// same code as in previous step
createAndBindQueueToExchangeAndDeadLetter(exchange, bindingDescriptor.queue());
// now that we are sure that both the exchange and queue exist, we can let Spring do its magic
processAmqpListener(toSpringRabbitListenerAnnotation(bindingDescriptor), method, bean, beanName);
}
}
private RabbitListener toSpringRabbitListenerAnnotation(final RabbitEventListener bindingDescriptor) {
// an annotation is just an interface, let's create an anonymous implementation!
return new RabbitListener() {
@Override
public Class<? extends Annotation> annotationType() {
return RabbitListener.class;
}
@Override
public String[] queues() {
return new String[]{bindingDescriptor.queue()};
}
// ... the other overridden methods just return the default values specified in @RabbitListener...
};
}
private FanoutExchange ensureExchangeExists(String exchangeName) {
String exchangeBeanName = beanNameForExchange(exchangeName);
if (beanRegistry.containsBean(exchangeBeanName)) {
return beanRegistry.getBean(exchangeBeanName, FanoutExchange.class);
}
FanoutExchange exchange = new FanoutExchange(exchangeName, true, false);
beanRegistry.registerSingleton(exchangeBeanName, exchange);
return exchange;
}
// ... more code...
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment