Created
September 11, 2015 23:13
-
-
Save burtonator/eb7a70e1750080ca621e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.spinn3r.artemis.scheduler.activemq; | |
import org.apache.activemq.ActiveMQConnectionFactory; | |
import org.apache.activemq.RedeliveryPolicy; | |
import org.apache.activemq.broker.BrokerService; | |
import org.apache.activemq.util.IdGenerator; | |
import org.junit.Before; | |
import org.junit.Test; | |
import javax.jms.*; | |
/** | |
* | |
*/ | |
public class TestBasicRollbacks /* extends TestWithEmbeddedBroker */ { | |
IdGenerator idGen = new IdGenerator(); | |
String queueName = "test-basic-rollbacks"; | |
BrokerService broker; | |
String brokerURL = "vm://localhost?create=false"; | |
@Before | |
public void setUp() throws Exception { | |
broker = new BrokerService(); | |
broker.setPersistent( false ); | |
broker.setUseJmx( false ); | |
broker.start(); | |
broker.waitUntilStarted(); | |
} | |
@Test | |
public void test1() throws Exception { | |
Client client0 = new Client(); | |
Client client1 = new Client(); | |
// *** produce the first message... | |
client0.produce( "hello world" ); | |
// ** now consume it on the first connection but then rollback | |
System.out.printf( "consume on the first client then rollback ...\n" ); | |
Message message1 = client0.consume(); | |
client0.session.rollback(); | |
// ** now consume it on the second connection and then commit | |
System.out.printf( "Try to consume on the second connection ...\n" ); | |
Message message2 = client1.consume(); | |
client1.session.commit(); | |
} | |
// a fake client with a producer, session, consumer, etc. | |
class Client { | |
Connection connection; | |
Session session; | |
MessageProducer messageProducer; | |
public Client() throws Exception { | |
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( brokerURL ); | |
RedeliveryPolicy policy = new RedeliveryPolicy(); | |
policy.setInitialRedeliveryDelay( 1000 ); | |
policy.setBackOffMultiplier( 2 ); | |
policy.setUseExponentialBackOff( true ); | |
policy.setMaximumRedeliveries( 5 ); | |
connectionFactory.setUseRetroactiveConsumer( true ); | |
connectionFactory.setRedeliveryPolicy( policy ); | |
connection = connectionFactory.createConnection(); | |
connection.setClientID( idGen.generateId() ); | |
connection.start(); | |
session = connection.createSession( true, Session.SESSION_TRANSACTED ); | |
} | |
public Message consume() throws Exception { | |
Queue dest = session.createQueue( queueName ); | |
MessageConsumer consumer1 = session.createConsumer( dest ); | |
return consumer1.receive(); | |
} | |
public void produce( String text ) throws Exception { | |
Queue dest = session.createQueue( queueName ); | |
MessageProducer messageProducer = session.createProducer( dest ); | |
Message message = session.createTextMessage( text ); | |
messageProducer.send( message ); | |
session.commit(); | |
messageProducer.close(); | |
} | |
public void close() throws Exception { | |
messageProducer.close(); | |
session.close(); | |
connection.close(); | |
} | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment