Last active
October 28, 2023 10:16
-
-
Save mhewedy/68a26c85764200f4f43a4939a9f95b5a to your computer and use it in GitHub Desktop.
Load RabbitMQ definitions JSON file on startup
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import brave.spring.rabbit.SpringRabbitTracing; | |
import lombok.Data; | |
import lombok.RequiredArgsConstructor; | |
import lombok.extern.slf4j.Slf4j; | |
import org.springframework.amqp.core.Message; | |
import org.springframework.amqp.rabbit.annotation.MultiRabbitListenerAnnotationBeanPostProcessor; | |
import org.springframework.amqp.rabbit.annotation.RabbitListenerAnnotationBeanPostProcessor; | |
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; | |
import org.springframework.amqp.rabbit.connection.*; | |
import org.springframework.amqp.rabbit.core.RabbitAdmin; | |
import org.springframework.amqp.rabbit.core.RabbitTemplate; | |
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry; | |
import org.springframework.amqp.rabbit.retry.MessageRecoverer; | |
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer; | |
import org.springframework.amqp.support.converter.MessageConverter; | |
import org.springframework.beans.factory.ObjectProvider; | |
import org.springframework.beans.factory.config.ConfigurableBeanFactory; | |
import org.springframework.boot.autoconfigure.EnableAutoConfiguration; | |
import org.springframework.boot.autoconfigure.amqp.*; | |
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; | |
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; | |
import org.springframework.boot.context.properties.ConfigurationProperties; | |
import org.springframework.boot.context.properties.EnableConfigurationProperties; | |
import org.springframework.boot.context.properties.PropertyMapper; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
import org.springframework.core.io.ResourceLoader; | |
import javax.annotation.PostConstruct; | |
import java.util.ArrayList; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.stream.Collectors; | |
/** | |
* Configurer to configure multi rabbit vhosts. it disabled auto-configurations and do configurations manually | |
* based on {@link MultiRabbitProps}. | |
* <p> | |
* this configuration has nothing to do with <a href="https://github.com/freenowtech/spring-multirabbit">freenowtech/spring-multirabbit</a> | |
* in fact, we migrated from this library as it is no longer supported. | |
* <p> | |
* Usage: | |
* <pre> | |
* # in application.yaml | |
* spring: | |
* multirabbitmq: | |
* enabled: true | |
* connections: | |
* first: | |
* host: localhost | |
* second: | |
* host: localhost | |
* virtualHost: myvhost | |
* third: | |
* host: anotherHost | |
* </pre> | |
* | |
* @see <a href="https://docs.spring.io/spring-amqp/reference/html/#multi-rabbit">spring rabbit docs</a> | |
*/ | |
@Slf4j | |
@Configuration | |
@RequiredArgsConstructor | |
@ConditionalOnClass(RabbitTemplate.class) | |
@EnableAutoConfiguration(exclude = RabbitAutoConfiguration.class) | |
@EnableConfigurationProperties(MultiRabbitConfig.MultiRabbitProps.class) | |
@ConditionalOnProperty(name = "spring.multirabbitmq.enabled", havingValue = "true") | |
public class MultiRabbitConfig { | |
private final MultiRabbitProps props; | |
private final ResourceLoader resourceLoader; | |
private final SpringRabbitTracing springRabbitTracing; | |
private final ConfigurableBeanFactory configurableBeanFactory; | |
private final ObjectProvider<MessageConverter> messageConverter; | |
@Data | |
@ConfigurationProperties(prefix = "spring.multirabbitmq") | |
public static class MultiRabbitProps { | |
public boolean enabled; | |
public Map<String, RabbitProperties> connections = new HashMap<>(); | |
} | |
@Bean | |
public RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry() { | |
return new RabbitListenerEndpointRegistry(); | |
} | |
@Bean | |
public RabbitListenerAnnotationBeanPostProcessor postProcessor(RabbitListenerEndpointRegistry registry) { | |
MultiRabbitListenerAnnotationBeanPostProcessor postProcessor | |
= new MultiRabbitListenerAnnotationBeanPostProcessor(); | |
postProcessor.setEndpointRegistry(registry); | |
postProcessor.setContainerFactoryBeanName("defaultContainerFactory"); | |
return postProcessor; | |
} | |
@PostConstruct | |
public void registerBeans() { | |
final List<ContainerHolder> containerHolders = new ArrayList<>(); | |
for (Map.Entry<String, RabbitProperties> entry : this.props.connections.entrySet()) { | |
String key = entry.getKey(); | |
RabbitProperties rabbitProperties = entry.getValue(); | |
log.debug("configuring rabbitmq => key: {} with host: {}, port: {}, vhost: {} username: {}", | |
key, rabbitProperties.determineHost(), rabbitProperties.determinePort(), | |
rabbitProperties.determineVirtualHost(), rabbitProperties.determineUsername()); | |
var connectionFactory = buildConnectionFactory(rabbitProperties); | |
this.configurableBeanFactory.registerSingleton(key + "-connectionFactory", connectionFactory); | |
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); | |
this.configurableBeanFactory.registerSingleton(key + "-admin", rabbitAdmin); | |
var containerFactory = buildSimpleListenerContainerFactory(connectionFactory); | |
this.configurableBeanFactory.registerSingleton(key, containerFactory); | |
containerHolders.add(new ContainerHolder(key, rabbitProperties, connectionFactory, containerFactory)); | |
} | |
var rcf = buildRoutingConnectionFactory(containerHolders); | |
this.configurableBeanFactory.registerSingleton("rcf", rcf); | |
var rabbitTemplate = new RabbitTemplate(rcf); | |
springRabbitTracing.decorateRabbitTemplate(rabbitTemplate); | |
this.messageConverter.ifUnique(rabbitTemplate::setMessageConverter); | |
this.configurableBeanFactory.registerSingleton("rabbitTemplate", rabbitTemplate); | |
configureContainerFactory(rabbitTemplate, containerHolders); | |
} | |
private CachingConnectionFactory buildConnectionFactory(RabbitProperties rabbitProperties) { | |
try { | |
// see RabbitAutoConfiguration.java | |
var rabbitConnectionFactoryBeanConfigurer = | |
new RabbitConnectionFactoryBeanConfigurer(resourceLoader, rabbitProperties); | |
var connectionFactoryBean = new RabbitConnectionFactoryBean(); | |
rabbitConnectionFactoryBeanConfigurer.configure(connectionFactoryBean); | |
connectionFactoryBean.afterPropertiesSet(); | |
com.rabbitmq.client.ConnectionFactory connectionFactory = connectionFactoryBean.getObject(); | |
if (connectionFactory == null) { | |
throw new RuntimeException("connectionFactory is null"); | |
} | |
var factory = new CachingConnectionFactory(connectionFactory); | |
CachingConnectionFactoryConfigurer configurer = new CachingConnectionFactoryConfigurer(rabbitProperties); | |
configurer.configure(factory); | |
return factory; | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
} | |
private SimpleRoutingConnectionFactory buildRoutingConnectionFactory( | |
List<ContainerHolder> cachingConnectionFactories) { | |
Map<Object, ConnectionFactory> map = cachingConnectionFactories.stream() | |
.collect(Collectors.toMap(it -> it.key, it -> it.connectionFactory)); | |
SimpleRoutingConnectionFactory rcf = new SimpleRoutingConnectionFactory(); | |
rcf.setDefaultTargetConnectionFactory(cachingConnectionFactories.get(0).connectionFactory); | |
rcf.setTargetConnectionFactories(map); | |
return rcf; | |
} | |
private SimpleRabbitListenerContainerFactory buildSimpleListenerContainerFactory( | |
CachingConnectionFactory connectionFactory) { | |
SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory(); | |
containerFactory.setConnectionFactory(connectionFactory); | |
return containerFactory; | |
} | |
private void configureContainerFactory( | |
RabbitTemplate rabbitTemplate, List<ContainerHolder> containerHolders) { | |
for (ContainerHolder h : containerHolders) { | |
var configurer = new MultiRabbitConfigurer(h.rabbitProperties, h.connectionFactory); | |
messageConverter.ifUnique(configurer::setMessageConverter); | |
configurer.setMessageRecoverer( | |
new MultiRabbitRepublishMessageRecoverer(rabbitTemplate, "general_dl_exchange", h.key)); | |
configurer.configure(h.containerFactory, h.connectionFactory); | |
springRabbitTracing.decorateSimpleRabbitListenerContainerFactory(h.containerFactory); | |
} | |
} | |
private record ContainerHolder( | |
String key, | |
RabbitProperties rabbitProperties, | |
ConnectionFactory connectionFactory, | |
SimpleRabbitListenerContainerFactory containerFactory) { | |
} | |
@RequiredArgsConstructor | |
private static class MultiRabbitConfigurer | |
extends AbstractRabbitListenerContainerFactoryConfigurer<SimpleRabbitListenerContainerFactory> { | |
private final RabbitProperties properties; | |
private final ConnectionFactory connectionFactory; | |
@Override | |
public void configure(SimpleRabbitListenerContainerFactory factory, ConnectionFactory ignored) { | |
RabbitProperties.SimpleContainer config = properties.getListener().getSimple(); | |
PropertyMapper map = PropertyMapper.get(); | |
configure(factory, connectionFactory, config); | |
map.from(config::getConcurrency).whenNonNull().to(factory::setConcurrentConsumers); | |
map.from(config::getMaxConcurrency).whenNonNull().to(factory::setMaxConcurrentConsumers); | |
map.from(config::getBatchSize).whenNonNull().to(factory::setBatchSize); | |
map.from(config::isConsumerBatchEnabled).to(factory::setConsumerBatchEnabled); | |
} | |
@Override | |
public void setMessageConverter(MessageConverter messageConverter) { | |
super.setMessageConverter(messageConverter); | |
} | |
@Override | |
public void setMessageRecoverer(MessageRecoverer messageRecoverer) { | |
super.setMessageRecoverer(messageRecoverer); | |
} | |
} | |
private static class MultiRabbitRepublishMessageRecoverer extends RepublishMessageRecoverer { | |
private final String key; | |
public MultiRabbitRepublishMessageRecoverer(RabbitTemplate rabbitTemplate, String errorExchange, String key) { | |
super(rabbitTemplate, errorExchange); | |
this.key = key; | |
} | |
@Override | |
protected void doSend(String exchange, String routingKey, Message message) { | |
SimpleResourceHolder.bind(((RabbitTemplate) errorTemplate).getConnectionFactory(), key); | |
// routing key is: "error.<original queue name>" | |
super.doSend(exchange, routingKey + message.getMessageProperties().getConsumerQueue(), message); | |
SimpleResourceHolder.unbind(((RabbitTemplate) errorTemplate).getConnectionFactory()); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment