Last active
April 30, 2017 18:59
-
-
Save ndemengel/1e7ee49278d91d62b6994b94f7c9866c to your computer and use it in GitHub Desktop.
RabbitMQ/Spring migration - Step 2: let's craft our own annotation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// let's use our own annotation! | |
@RabbitEventListener(queue = "email.updated.mailchimp.queue", exchange = RabbitConfig.EMAIL_UPDATED_EXCHANGE) | |
public void emailUpdated(EmailUpdated event) { | |
updateMailChimpMember(event.getOldEmail(), event.getNewEmail()); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Documented | |
@Retention(RetentionPolicy.RUNTIME) | |
@Target(ElementType.METHOD) | |
public @interface RabbitEventListener { | |
String exchange(); | |
String queue(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@PostConstruct | |
public void createExchanges() { | |
// We don't create the queues anymore here! We don't care about the bindings either :-) | |
createExchangeBean(EMAIL_UPDATED_EXCHANGE); | |
// ... other exchanges... | |
} | |
@Override | |
@Primary | |
@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME) | |
@Role(BeanDefinition.ROLE_INFRASTRUCTURE) | |
public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() { | |
return new RabbitEventListenerPostProcessor(beanRegistry, backFromTheDeadExchange()); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class RabbitEventListenerPostProcessor extends RabbitListenerAnnotationBeanPostProcessor { | |
@Override | |
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException { | |
Class<?> targetClass = AopUtils.getTargetClass(bean); | |
ReflectionUtils.doWithMethods(targetClass, method -> | |
initRabbitListener(bean, beanName, method), ReflectionUtils.USER_DECLARED_METHODS); | |
return super.postProcessAfterInitialization(bean, beanName); | |
} | |
private void initRabbitListener(Object bean, String beanName, Method method) { | |
RabbitEventListener bindingDescriptor = AnnotationUtils.findAnnotation(method, RabbitEventListener.class); | |
if (bindingDescriptor != null) { | |
FanoutExchange exchange = ensureExchangeExists(bindingDescriptor.exchange()); | |
// same code as in previous step | |
createAndBindQueueToExchangeAndDeadLetter(exchange, bindingDescriptor.queue()); | |
// now that we are sure that both the exchange and queue exist, we can let Spring do its magic | |
processAmqpListener(toSpringRabbitListenerAnnotation(bindingDescriptor), method, bean, beanName); | |
} | |
} | |
private RabbitListener toSpringRabbitListenerAnnotation(final RabbitEventListener bindingDescriptor) { | |
// an annotation is just an interface, let's create an anonymous implementation! | |
return new RabbitListener() { | |
@Override | |
public Class<? extends Annotation> annotationType() { | |
return RabbitListener.class; | |
} | |
@Override | |
public String[] queues() { | |
return new String[]{bindingDescriptor.queue()}; | |
} | |
// ... the other overridden methods just return the default values specified in @RabbitListener... | |
}; | |
} | |
private FanoutExchange ensureExchangeExists(String exchangeName) { | |
String exchangeBeanName = beanNameForExchange(exchangeName); | |
if (beanRegistry.containsBean(exchangeBeanName)) { | |
return beanRegistry.getBean(exchangeBeanName, FanoutExchange.class); | |
} | |
FanoutExchange exchange = new FanoutExchange(exchangeName, true, false); | |
beanRegistry.registerSingleton(exchangeBeanName, exchange); | |
return exchange; | |
} | |
// ... more code... | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment