Skip to content

Instantly share code, notes, and snippets.

@rponte
Last active July 27, 2023 17:41
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save rponte/ba5f2f569903e059c907 to your computer and use it in GitHub Desktop.
Save rponte/ba5f2f569903e059c907 to your computer and use it in GitHub Desktop.
Starting and stopping Embedded ActiveMQ Broker during integration tests with jUnit Rules.
package base.jms;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.usage.SystemUsage;
import org.junit.rules.ExternalResource;
public class ActiveMQBrokerRule extends ExternalResource {
private static final int STORAGE_LIMIT = 1024 * 1024 * 8; // 8mb
private BrokerService broker;
@Override
protected void before() throws Throwable {
broker = new BrokerService();
broker.addConnector(connector());
broker.setUseJmx(true);
broker.setPersistent(false);
configureStorage();
broker.start();
}
@Override
protected void after() {
if (broker != null) {
try {
broker.stop();
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
}
public ActiveMQJMXUtils getActiveMQJMXUtils() {
return new ActiveMQJMXUtils(this.broker);
}
private TransportConnector connector() throws URISyntaxException {
TransportConnector connector = new TransportConnector();
connector.setUri(new URI("vm://localhost")); // or tcp://localhost:0
return connector;
}
private void configureStorage() {
SystemUsage systemUsage = broker.getSystemUsage();
systemUsage.getStoreUsage().setLimit(STORAGE_LIMIT);
systemUsage.getTempUsage().setLimit(STORAGE_LIMIT);
}
}
package base.jms;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.web.BrokerFacadeSupport;
import org.apache.activemq.web.LocalBrokerFacade;
public class ActiveMQJMXUtils {
private final BrokerFacadeSupport facade;
public ActiveMQJMXUtils(BrokerService brokerService) {
this.facade = new LocalBrokerFacade(brokerService);
}
public void cleanUp(String queueName) {
try {
facade.purgeQueue(new ActiveMQQueue(queueName));
long size = getQueueSize(queueName);
if (size > 0)
throw new IllegalStateException("It was not possible to clean up the queue '" + queueName + "'.");
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
public long getQueueSize(String queueName) {
try {
QueueViewMBean queue = facade.getQueue(queueName);
return (queue != null ? queue.getQueueSize() : 0);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
}
import org.junit.ClassRule;
public class MyIntegrationTest {
@ClassRule
public static ActiveMQBrokerRule brokerRule = new ActiveMQBrokerRule();
@Before
public void setup() {
// using helper to clean up a queue
brokerRule.getActiveMQJMXUtils().cleanUp("my.queue");
}
@Test
public void myTest() {
// do something here
assertEquals("total of messages in queue", 4, brokerRule.getActiveMQJMXUtils().getQueueSize("my.queue"));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment