Skip to content

Instantly share code, notes, and snippets.

@jgoodyear
Created October 9, 2018 12:53
Show Gist options
  • Save jgoodyear/f3c716aec99c066894047ed996cdcdc9 to your computer and use it in GitHub Desktop.
Save jgoodyear/f3c716aec99c066894047ed996cdcdc9 to your computer and use it in GitHub Desktop.
A non-completing test case to show GC compaction on AMQ KahaDB PageFiles
@Test
public void testAMQ7067() throws Exception {
PersistenceAdapterViewMBean kahadbView = getProxyToPersistenceAdapter(broker.getPersistenceAdapter().toString());
setupXAConnection();
Queue holdKahaDb = xaSession.createQueue(“holdKahaDb”);
createDanglingTransaction(xaRes, xaSession, holdKahaDb);
for (Integer i = 0; i < 100; i++) {
MessageProducer holdKahaDbProducer = xaSession.createProducer(holdKahaDb);
XATransactionId txid = createXATransaction();
System.out.println(“****** create new txid = ” + txid);
xaRes.start(txid, TMNOFLAGS);
TextMessage helloMessage = xaSession.createTextMessage(StringUtils.repeat(“a”, 10));
holdKahaDbProducer.send(helloMessage);
xaRes.end(txid, TMSUCCESS);
xaRes.prepare(txid);
Queue queue = xaSession.createQueue(“test”);
produce(xaRes, xaSession, queue, 100, 512 * 1024);
xaRes.commit(txid, false);
produce(xaRes, xaSession, queue, 100, 512 * 1024);
((org.apache.activemq.broker.region.Queue) broker.getRegionBroker().getDestinationMap().get(queue)).purge();
}
Xid[] xids = xaRes.recover(TMSTARTRSCAN);
//Should be 1 since we have only 1 prepared
assertEquals(1, xids.length);
connection.close();
//System.out.println(“Pending Transactions: ” + jolokia.getPendingTransactions());
broker.stop();
broker.waitUntilStopped();
createBroker();
setupXAConnection();
xids = xaRes.recover(TMSTARTRSCAN);
// THIS SHOULD NOT FAIL AS THERE SHOUL DBE ONLY 1 TRANSACTION!
assertEquals(1, xids.length);
for (Xid xid : xids) {
xaRes.commit(xid, true);
}
CountDownLatch latch = new CountDownLatch(200);
ActiveMQConnectionFactory simpleConncetion = ACTIVE_MQ_NON_XA_CONNECTION_FACTORY;
Connection simple = simpleConncetion.createConnection();
Session simpleSession = simple.createSession(false, Session.AUTO_ACKNOWLEDGE);
holdKahaDb = simpleSession.createQueue(“holdKahaDb”);
MessageConsumer consumer = simpleSession.createConsumer(holdKahaDb);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
System.out.println(“Message ” + message.toString());
latch.countDown();
} catch (Exception ex) {
assertTrue(false);
}
}
});
simple.start();
latch.await(1000l, TimeUnit.SECONDS);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment