Skip to content

Instantly share code, notes, and snippets.

@burtonator
Created September 11, 2015 23:13
Show Gist options
  • Save burtonator/eb7a70e1750080ca621e to your computer and use it in GitHub Desktop.
Save burtonator/eb7a70e1750080ca621e to your computer and use it in GitHub Desktop.
package com.spinn3r.artemis.scheduler.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.IdGenerator;
import org.junit.Before;
import org.junit.Test;
import javax.jms.*;
/**
*
*/
public class TestBasicRollbacks /* extends TestWithEmbeddedBroker */ {
IdGenerator idGen = new IdGenerator();
String queueName = "test-basic-rollbacks";
BrokerService broker;
String brokerURL = "vm://localhost?create=false";
@Before
public void setUp() throws Exception {
broker = new BrokerService();
broker.setPersistent( false );
broker.setUseJmx( false );
broker.start();
broker.waitUntilStarted();
}
@Test
public void test1() throws Exception {
Client client0 = new Client();
Client client1 = new Client();
// *** produce the first message...
client0.produce( "hello world" );
// ** now consume it on the first connection but then rollback
System.out.printf( "consume on the first client then rollback ...\n" );
Message message1 = client0.consume();
client0.session.rollback();
// ** now consume it on the second connection and then commit
System.out.printf( "Try to consume on the second connection ...\n" );
Message message2 = client1.consume();
client1.session.commit();
}
// a fake client with a producer, session, consumer, etc.
class Client {
Connection connection;
Session session;
MessageProducer messageProducer;
public Client() throws Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( brokerURL );
RedeliveryPolicy policy = new RedeliveryPolicy();
policy.setInitialRedeliveryDelay( 1000 );
policy.setBackOffMultiplier( 2 );
policy.setUseExponentialBackOff( true );
policy.setMaximumRedeliveries( 5 );
connectionFactory.setUseRetroactiveConsumer( true );
connectionFactory.setRedeliveryPolicy( policy );
connection = connectionFactory.createConnection();
connection.setClientID( idGen.generateId() );
connection.start();
session = connection.createSession( true, Session.SESSION_TRANSACTED );
}
public Message consume() throws Exception {
Queue dest = session.createQueue( queueName );
MessageConsumer consumer1 = session.createConsumer( dest );
return consumer1.receive();
}
public void produce( String text ) throws Exception {
Queue dest = session.createQueue( queueName );
MessageProducer messageProducer = session.createProducer( dest );
Message message = session.createTextMessage( text );
messageProducer.send( message );
session.commit();
messageProducer.close();
}
public void close() throws Exception {
messageProducer.close();
session.close();
connection.close();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment