Skip to content

Instantly share code, notes, and snippets.

@dsyer
Created January 17, 2011 11:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dsyer/782739 to your computer and use it in GitHub Desktop.
Save dsyer/782739 to your computer and use it in GitHub Desktop.
Test project for ActiveMQ topic memory usage
target
activemq-data
bin
.settings
.classpath
.project
<?xml version="1.0" encoding="UTF-8"?>
<beansProjectDescription>
<version>1</version>
<pluginVersion><![CDATA[2.5.1.201011101000-RELEASE]]></pluginVersion>
<configSuffixes>
<configSuffix><![CDATA[xml]]></configSuffix>
</configSuffixes>
<enableImports><![CDATA[true]]></enableImports>
<configs>
<config>src/main/resources/META-INF/spring/jms-context.xml</config>
<config>src/main/resources/META-INF/spring/broker-single-context.xml</config>
</configs>
<configSets>
</configSets>
</beansProjectDescription>

Test project for ActiveMQ topic memory usage. The problem is that 5.4.x seems to have a memory leak (non-zero memory usage even in a quiescent system) in a networked broker, but not a single broker.

To use: set -DENVIRONMENT=...

  • unset (the default): run with a local vm: broker
  • single a single tcp broker
  • double a network of 2 brokers (one and two).

JMX is always enabled and you can see the topic and its memory usage in JConsole. The unit test also uses the MBean to make assertions.

Current status:

  • Works fine with single broker with any persistence setting
  • Networked brokers fail if non-persistent with 5.4 (OK with 5.3) with <broker persistent="false" .../>.

The networked brokers also occasionally fail the assertion about received messages - i.e. the listeners do not fire. Strange (but irrelevant)?

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">
<bean id="topicViewOne" class="org.springframework.jmx.access.MBeanProxyFactoryBean">
<property name="proxyInterface"
value="org.apache.activemq.broker.jmx.TopicViewMBean" />
<property name="server" ref="mBeanServer" />
<property name="objectName"
value="org.apache.activemq:BrokerName=one,Destination=topic,Type=Topic" />
</bean>
<bean id="topicViewTwo" class="org.springframework.jmx.access.MBeanProxyFactoryBean">
<property name="proxyInterface"
value="org.apache.activemq.broker.jmx.TopicViewMBean" />
<property name="server" ref="mBeanServer" />
<property name="objectName"
value="org.apache.activemq:BrokerName=two,Destination=topic,Type=Topic" />
</bean>
<broker id="brokerService" xmlns="http://activemq.apache.org/schema/core"
useJmx="true" brokerName="one">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic="topic" memoryLimit="${topic.limit:1kb}">
<pendingSubscriberPolicy>
<vmCursor />
</pendingSubscriberPolicy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<destinations>
<topic physicalName="topic" />
</destinations>
<managementContext>
<managementContext createConnector="false" />
</managementContext>
<networkConnectors>
<networkConnector uri="${network.uri.1:static:(tcp://0.0.0.0:61617)}"
prefetchSize="1" />
</networkConnectors>
<transportConnectors>
<transportConnector uri="${transport.uri.1:tcp://0.0.0.0:61616}" />
</transportConnectors>
</broker>
<broker xmlns="http://activemq.apache.org/schema/core" useJmx="true"
brokerName="two">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic="topic" memoryLimit="${topic.limit:1kb}">
<pendingSubscriberPolicy>
<vmCursor />
</pendingSubscriberPolicy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<destinations>
<topic physicalName="topic" />
</destinations>
<managementContext>
<managementContext createConnector="false" />
</managementContext>
<networkConnectors>
<networkConnector uri="${network.uri.2:static:(tcp://0.0.0.0:61616)}"
prefetchSize="1" />
</networkConnectors>
<transportConnectors>
<transportConnector uri="${transport.uri.2:tcp://0.0.0.0:61617}" />
</transportConnectors>
</broker>
</beans>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">
<bean id="topicView" class="org.springframework.jmx.access.MBeanProxyFactoryBean">
<property name="proxyInterface"
value="org.apache.activemq.broker.jmx.TopicViewMBean" />
<property name="server" ref="mBeanServer" />
<property name="objectName"
value="org.apache.activemq:BrokerName=single,Destination=topic,Type=Topic" />
</bean>
<broker id="brokerService" xmlns="http://activemq.apache.org/schema/core"
useJmx="true" brokerName="single">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic="topic" memoryLimit="${topic.limit:1mb}"
enableAudit="false">
<pendingSubscriberPolicy>
<vmCursor />
</pendingSubscriberPolicy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<destinations>
<topic physicalName="topic" />
</destinations>
<managementContext>
<managementContext createConnector="false" />
</managementContext>
<transportConnectors>
<transportConnector uri="${transport.uri:vm://localhost}" />
</transportConnectors>
</broker>
</beans>
broker.uri=failover:(tcp://localhost:61616,tcp://localhost:61617)?randomize=true&jms.prefetchPolicy.all=1
transport.uri.1=tcp://0.0.0.0:61616
transport.uri.2=tcp://0.0.0.0:61617
topic.limit=1mb
broker.uri=tcp://localhost:61616
transport.uri=tcp://0.0.0.0:61616
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<import resource="broker-${ENVIRONMENT:single}-context.xml" />
<context:property-placeholder
location="environment-${ENVIRONMENT:vm}.properties"
system-properties-mode="OVERRIDE" ignore-resource-not-found="true" />
<jms:listener-container connection-factory="bareConnectionFactory"
destination-type="topic" concurrency="3-5" cache="consumer">
<jms:listener destination="topic" ref="listener" method="handle" />
</jms:listener-container>
<bean id="listener" class="org.springframework.samples.jms.MessagingTests$Listener" />
<context:mbean-server id="mBeanServer" />
<context:mbean-export server="mBeanServer" />
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="cachingConnectionFactory" />
<property name="receiveTimeout" value="100" />
<property name="defaultDestinationName" value="topic" />
<property name="pubSubDomain" value="true" />
<property name="deliveryPersistent" value="false" />
<property name="explicitQosEnabled" value="true" />
</bean>
<bean id="cachingConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="bareConnectionFactory" />
</bean>
<bean id="bareConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
depends-on="brokerService">
<property name="brokerURL" value="${broker.uri:vm://localhost?async=false&amp;broker.persistent=false}" />
<property name="alwaysSyncSend" value="${always.sync.send:true}" />
</bean>
</beans>
log4j.rootCategory=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %t %c{1}:%L - %m%n
log4j.category.org.apache.activemq=ERROR
log4j.category.org.apache.activemq.ActiveMQSession=DEBUG
log4j.category.org.apache.activemq.TransactionContext=DEBUG
# log4j.category.org.springframework=DEBUG
log4j.category.com.springsource.open=DEBUG
log4j.category.org.springframework.jdbc=DEBUG
log4j.category.org.springframework.transaction=DEBUG
log4j.category.org.springframework.jms=DEBUG
/*
* Copyright 2006-2007 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.samples.jms;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.activemq.broker.jmx.TopicViewMBean;
import org.apache.commons.io.FileUtils;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "/META-INF/spring/jms-context.xml")
public class MessagingTests {
private static final int NUM = 100;
@BeforeClass
public static void clear() throws Exception {
FileUtils.deleteDirectory(new File("activemq-data"));
}
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private Collection<TopicViewMBean> topicViews;
@Autowired
private DefaultMessageListenerContainer container;
@Before
public void onSetUp() throws Exception {
Listener.clear();
}
@Test
public void testMessaging() throws Exception {
assertTrue(container.isRunning());
for (int i = 0; i < NUM; i++) {
jmsTemplate.convertAndSend("topic", "foo" + i + ":" + BODY);
}
Thread.sleep(3000L);
container.stop();
for (TopicViewMBean topicView : topicViews) {
assertEquals(0, topicView.getMemoryPercentUsage());
assertEquals(0, topicView.getInFlightCount());
}
List<String> list = Listener.getMessages();
assertTrue("Insufficient messages received: " + list.size(),
NUM <= list.size());
}
public static class Listener {
private static List<String> msgs = new ArrayList<String>();
public static void clear() {
msgs.clear();
}
public static List<String> getMessages() {
return msgs;
}
public void handle(String message) {
msgs.add(Thread.currentThread().getName() + ":"
+ message.substring(0, message.indexOf(":")));
}
}
private static String BODY = "kj\\agdfkjz0gfdkzjsgfdkzjsf4vgkjzxgvkjxhbkseurvbxfd,gmhjg";
}
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.springframework.samples</groupId>
<artifactId>topic-memory</artifactId>
<version>2.0.0.CI-SNAPSHOT</version>
<packaging>jar</packaging>
<name>ActiveMQ Topic Memory</name>
<description><![CDATA[Sample project showing topic memory usage
]]> </description>
<properties>
<maven.test.failure.ignore>true</maven.test.failure.ignore>
<spring.framework.version>3.0.5.RELEASE</spring.framework.version>
<junit.version>4.8.2</junit.version>
<activemq.version>5.4.2</activemq.version>
</properties>
<profiles>
<profile>
<id>strict</id>
<properties>
<maven.test.failure.ignore>false</maven.test.failure.ignore>
</properties>
</profile>
<profile>
<id>fast</id>
<properties>
<maven.test.skip>true</maven.test.skip>
</properties>
</profile>
</profiles>
<dependencies>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.1</version>
<exclusions>
<exclusion>
<groupId>avalon-framework</groupId>
<artifactId>avalon-framework</artifactId>
</exclusion>
<exclusion>
<groupId>logkit</groupId>
<artifactId>logkit</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>10.2.1.6</version>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.7</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>${activemq.version}</version>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging-api</artifactId>
</exclusion>
<exclusion>
<groupId>mx4j</groupId>
<artifactId>mx4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId> geronimo-j2ee-management_1.0_spec </artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.activemq</groupId>
<artifactId>activeio-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
</exclusion>
<exclusion>
<artifactId>org.osgi.core</artifactId>
<groupId>org.osgi</groupId>
</exclusion>
<exclusion>
<artifactId>jasypt</artifactId>
<groupId>org.jasypt</groupId>
</exclusion>
<exclusion>
<artifactId>icu4j</artifactId>
<groupId>com.ibm.icu</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<version>2.4</version>
<scope>test</scope>
</dependency>
<dependency>
<!-- This is just for the JMS spec - could use javax.jms -->
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
<version>1.0</version>
<exclusions>
<exclusion>
<groupId>mockobjects</groupId>
<artifactId>mockobjects-jdk1.4-j2ee1.3</artifactId>
</exclusion>
<exclusion>
<groupId>mockobjects</groupId>
<artifactId>mockobjects-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-j2ee-management_1.1_spec</artifactId>
<version>1.0.1</version>
</dependency>
<!-- Spring Dependencies -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>${spring.framework.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.framework.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.framework.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>${spring.framework.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>${spring.framework.version}</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>apache-snapshots</id>
<url>https://repository.apache.org/content/repositories/snapshots</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
</project>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment