Skip to content

Instantly share code, notes, and snippets.

@mhewedy
Last active September 17, 2022 10:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mhewedy/e4fb38ad944e6d4754a5f38a3b9f4771 to your computer and use it in GitHub Desktop.
Save mhewedy/e4fb38ad944e6d4754a5f38a3b9f4771 to your computer and use it in GitHub Desktop.
Multi-rabbit config for spring amqp
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.MultiRabbitListenerAnnotationBeanPostProcessor;
import org.springframework.amqp.rabbit.annotation.RabbitListenerAnnotationBeanPostProcessor;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.amqp.*;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ResourceLoader;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Configurer to configure multi rabbit vhosts. it disabled auto-configurations and do configurations manually
* based on {@link MultiRabbitProps}.
* <p>
* this configuration has nothing to do with <a href="https://github.com/freenowtech/spring-multirabbit">freenowtech/spring-multirabbit</a>
* in fact, we migrated from this library as it is no longer supported.
* <p>
* Usage:
* <pre>
* # in application.yaml
* spring:
* multirabbitmq:
* enabled: true
* connections:
* first:
* host: localhost
* second:
* host: localhost
* virtualHost: myvhost
* third:
* host: anotherHost
* </pre>
*
* @see <a href="https://docs.spring.io/spring-amqp/reference/html/#multi-rabbit">spring rabbit docs</a>
*/
@Slf4j
@Configuration
@RequiredArgsConstructor
@ConditionalOnClass(RabbitTemplate.class)
@EnableAutoConfiguration(exclude = RabbitAutoConfiguration.class)
@EnableConfigurationProperties(MultiRabbitConfig.MultiRabbitProps.class)
@ConditionalOnProperty(name = "spring.multirabbitmq.enabled", havingValue = "true")
public class MultiRabbitConfig {
private final MultiRabbitProps props;
private final ResourceLoader resourceLoader;
private final ConfigurableBeanFactory configurableBeanFactory;
private final ObjectProvider<MessageConverter> messageConverter;
@Data
@ConfigurationProperties(prefix = "spring.multirabbitmq")
public static class MultiRabbitProps {
public boolean enabled;
public Map<String, RabbitProperties> connections = new HashMap<>();
}
@Bean
public RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry() {
return new RabbitListenerEndpointRegistry();
}
@Bean
public RabbitListenerAnnotationBeanPostProcessor postProcessor(RabbitListenerEndpointRegistry registry) {
MultiRabbitListenerAnnotationBeanPostProcessor postProcessor
= new MultiRabbitListenerAnnotationBeanPostProcessor();
postProcessor.setEndpointRegistry(registry);
postProcessor.setContainerFactoryBeanName("defaultContainerFactory");
return postProcessor;
}
@PostConstruct
public void registerBeans() {
final List<ContainerHolder> containerHolders = new ArrayList<>();
for (Map.Entry<String, RabbitProperties> entry : this.props.connections.entrySet()) {
String key = entry.getKey();
RabbitProperties rabbitProperties = entry.getValue();
log.debug("configuring rabbitmq => key: {} with host: {}, port: {}, vhost: {} username: {}",
key, rabbitProperties.determineHost(), rabbitProperties.determinePort(),
rabbitProperties.determineVirtualHost(), rabbitProperties.determineUsername());
var connectionFactory = buildConnectionFactory(rabbitProperties);
this.configurableBeanFactory.registerSingleton(key + "-connectionFactory", connectionFactory);
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
this.configurableBeanFactory.registerSingleton(key + "-admin", rabbitAdmin);
var containerFactory = buildSimpleListenerContainerFactory(connectionFactory);
this.configurableBeanFactory.registerSingleton(key, containerFactory);
containerHolders.add(new ContainerHolder(key, rabbitProperties, connectionFactory, containerFactory));
}
var rcf = buildRoutingConnectionFactory(containerHolders);
this.configurableBeanFactory.registerSingleton("rcf", rcf);
var rabbitTemplate = new RabbitTemplate(rcf);
this.messageConverter.ifUnique(rabbitTemplate::setMessageConverter);
this.configurableBeanFactory.registerSingleton("rabbitTemplate", rabbitTemplate);
configureContainerFactory(rabbitTemplate, containerHolders);
}
private CachingConnectionFactory buildConnectionFactory(RabbitProperties rabbitProperties) {
try {
// see RabbitAutoConfiguration.java
var rabbitConnectionFactoryBeanConfigurer =
new RabbitConnectionFactoryBeanConfigurer(resourceLoader, rabbitProperties);
var connectionFactoryBean = new RabbitConnectionFactoryBean();
rabbitConnectionFactoryBeanConfigurer.configure(connectionFactoryBean);
connectionFactoryBean.afterPropertiesSet();
com.rabbitmq.client.ConnectionFactory connectionFactory = connectionFactoryBean.getObject();
if (connectionFactory == null) {
throw new RuntimeException("connectionFactory is null");
}
var factory = new CachingConnectionFactory(connectionFactory);
CachingConnectionFactoryConfigurer configurer = new CachingConnectionFactoryConfigurer(rabbitProperties);
configurer.configure(factory);
return factory;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private SimpleRoutingConnectionFactory buildRoutingConnectionFactory(
List<ContainerHolder> cachingConnectionFactories) {
Map<Object, ConnectionFactory> map = cachingConnectionFactories.stream()
.collect(Collectors.toMap(it -> it.key, it -> it.connectionFactory));
SimpleRoutingConnectionFactory rcf = new SimpleRoutingConnectionFactory();
rcf.setDefaultTargetConnectionFactory(cachingConnectionFactories.get(0).connectionFactory);
rcf.setTargetConnectionFactories(map);
return rcf;
}
private SimpleRabbitListenerContainerFactory buildSimpleListenerContainerFactory(
CachingConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
containerFactory.setConnectionFactory(connectionFactory);
return containerFactory;
}
private void configureContainerFactory(
RabbitTemplate rabbitTemplate, List<ContainerHolder> containerHolders) {
for (ContainerHolder h : containerHolders) {
var configurer = new MultiRabbitConfigurer(h.rabbitProperties, h.connectionFactory);
messageConverter.ifUnique(configurer::setMessageConverter);
configurer.setMessageRecoverer(
new MultiRabbitRepublishMessageRecoverer(rabbitTemplate, "general_dl_exchange", h.key));
configurer.configure(h.containerFactory, h.connectionFactory);
}
}
private record ContainerHolder(
String key,
RabbitProperties rabbitProperties,
ConnectionFactory connectionFactory,
SimpleRabbitListenerContainerFactory containerFactory) {
}
@RequiredArgsConstructor
private static class MultiRabbitConfigurer
extends AbstractRabbitListenerContainerFactoryConfigurer<SimpleRabbitListenerContainerFactory> {
private final RabbitProperties properties;
private final ConnectionFactory connectionFactory;
@Override
public void configure(SimpleRabbitListenerContainerFactory factory, ConnectionFactory ignored) {
RabbitProperties.SimpleContainer config = properties.getListener().getSimple();
PropertyMapper map = PropertyMapper.get();
configure(factory, connectionFactory, config);
map.from(config::getConcurrency).whenNonNull().to(factory::setConcurrentConsumers);
map.from(config::getMaxConcurrency).whenNonNull().to(factory::setMaxConcurrentConsumers);
map.from(config::getBatchSize).whenNonNull().to(factory::setBatchSize);
map.from(config::isConsumerBatchEnabled).to(factory::setConsumerBatchEnabled);
}
@Override
public void setMessageConverter(MessageConverter messageConverter) {
super.setMessageConverter(messageConverter);
}
@Override
public void setMessageRecoverer(MessageRecoverer messageRecoverer) {
super.setMessageRecoverer(messageRecoverer);
}
}
private static class MultiRabbitRepublishMessageRecoverer extends RepublishMessageRecoverer {
private final String key;
public MultiRabbitRepublishMessageRecoverer(RabbitTemplate rabbitTemplate, String errorExchange, String key) {
super(rabbitTemplate, errorExchange);
this.key = key;
}
@Override
protected void doSend(String exchange, String routingKey, Message message) {
SimpleResourceHolder.bind(((RabbitTemplate) errorTemplate).getConnectionFactory(), key);
// routing key is: "error.<original queue name>"
super.doSend(exchange, routingKey + message.getMessageProperties().getConsumerQueue(), message);
SimpleResourceHolder.unbind(((RabbitTemplate) errorTemplate).getConnectionFactory());
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment