Skip to content

Instantly share code, notes, and snippets.

@AlejandroRivera
Created October 7, 2016 15:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save AlejandroRivera/34235c35bb62ab572932b373444420a0 to your computer and use it in GitHub Desktop.
Save AlejandroRivera/34235c35bb62ab572932b373444420a0 to your computer and use it in GitHub Desktop.
Apache QPid as embedded MQ broker
package com.redmart.commons.test.mq;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Resources;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.util.ContextInitializer;
import ch.qos.logback.core.joran.spi.JoranException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.redmart.commons.test.integration.RandomPortProvider;
import org.apache.commons.io.IOUtils;
import org.apache.qpid.server.Broker;
import org.apache.qpid.server.BrokerOptions;
import org.junit.rules.ExternalResource;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileOutputStream;
import java.net.URL;
import java.util.Map;
import java.util.function.Supplier;
/**
* A JUnit resource that starts a Apache Qpid Message Queue Broker.
*
* <pre><code>
* public class MyTest {
* {@literal @}ClassRule
* public static EmbeddedMessageQueueResource mqBroker = new EmbeddedMessageQueueResource();
*
* private Connection connection;
*
* {@literal @}BeforeClass
* public static void setupClass() {
* ConnectionFactory connectionFactory = new ConnectionFactory();
* connectionFactory.setHost("localhost");
* connectionFactory.setPort(mqBroker.getAmqpPort());
* connectionFactory.setVirtualHost(mqBroker.getVirtualHost());
* connectionFactory.setUsername(mqBroker.getUsername());
* connectionFactory.setPassword(mqBroker.getPassword());
*
* Connection connection = connectionFactory.newConnection();
* // ...
* }
*
* {@literal @}AfterClass
* public static void afterClass() {
* this.connection.close();
* // ...
* }
*
* {@literal @}Test
* public testSomething(){
* Channel channel = connection.createChannel();
* // ...
* }
* }
* </code></pre>
*/
public class EmbeddedMessageQueueResource extends ExternalResource {
public static final Map<String, Object> DEFAULT_VHOST_CONFIG = ImmutableMap.of("type", "Memory");
private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedMessageQueueResource.class);
private final int amqpPort;
private final String virtualHost;
private final String username;
private final String password;
private final Map<String, Object> virtualHostConfig;
private TemporaryFolder temporaryFolder;
private Broker mqBroker;
public EmbeddedMessageQueueResource() {
this(new RandomPortProvider());
}
public EmbeddedMessageQueueResource(Supplier<Integer> supplier) {
this(supplier.get());
}
public EmbeddedMessageQueueResource(int port) {
this(port, "default", "guest", "guest", DEFAULT_VHOST_CONFIG, new TemporaryFolder());
}
/**
* A constructor, d'uh.
*/
public EmbeddedMessageQueueResource(int amqpPort, String virtualHost, String username, String password,
Map<String, Object> virtualHostConfig, TemporaryFolder temporaryFolder) {
this.amqpPort = amqpPort;
this.virtualHost = virtualHost;
this.virtualHostConfig = virtualHostConfig;
this.username = username;
this.password = password;
this.temporaryFolder = temporaryFolder;
}
@Override
protected void before() throws Throwable {
super.before();
this.temporaryFolder.create();
final File homeDirectory = temporaryFolder.newFolder("home");
final File workDirectory = temporaryFolder.newFolder("wordkir");
String config = Resources.toString(Resources.getResource("apache-qpid/conf.json"), Charsets.UTF_8);
IOUtils.write(config, new FileOutputStream(new File(homeDirectory, "conf.json")));
String passwd = this.username + ":" + this.password;
IOUtils.write(passwd, new FileOutputStream(new File(homeDirectory, "passwd")));
mqBroker = new Broker();
BrokerOptions options = new BrokerOptions();
options.setConfigProperty("qpid.work_dir", workDirectory.getAbsolutePath());
options.setConfigProperty("qpid.home_dir", homeDirectory.getAbsolutePath());
options.setConfigProperty("qpid.amqp_port", String.valueOf(amqpPort));
options.setConfigProperty("qpid.virtual_host", this.virtualHost);
options.setConfigProperty("qpid.virtual_host_json_config",
new ObjectMapper().writeValueAsString(this.virtualHostConfig));
options.setConfigProperty("queue.deadLetterQueueEnabled", "true");
options.setStartupLoggedToSystemOut(false);
options.setInitialConfigurationLocation(homeDirectory.getAbsolutePath() + File.separator + "conf.json");
mqBroker.startup(options);
reloadLogbackConfig();
LOGGER.info("Embedded Message Queue Broker (Apache Qpid) started on port {} using username '{}'", amqpPort, username);
LOGGER.warn("Apache Qpid isn't RabbitMQ! Learn more about it: https://www.rabbitmq.com/interoperability.html");
}
/**
* Apache Qpid modifies the Logging configuration manually since it was designed to run as a standalone server.
*
* <p>We need to reload the config to go back to the expected settings.</p>
*/
private void reloadLogbackConfig() throws JoranException {
LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
ContextInitializer ci = new ContextInitializer(loggerContext);
URL url = ci.findURLOfDefaultConfigurationFile(true);
if (url == null) {
LOGGER.error("Could not reload Logback config to be reloaded. This will reduce logging visibility.");
return;
}
loggerContext.reset();
ci.configureByResource(url);
}
@Override
protected void after() {
super.after();
mqBroker.shutdown();
temporaryFolder.delete();
}
public int getAmqpPort() {
return amqpPort;
}
public String getPassword() {
return password;
}
public String getUsername() {
return username;
}
public String getVirtualHost() {
return virtualHost;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment