Created
June 10, 2016 01:20
-
-
Save burtonator/ebf06a7238bd9a1273853ce5282acf02 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
String brokerURL = String.format( "tcp://%s:%s?jms.prefetchPolicy.all=1", TestWithEmbeddedBroker.BROKER_HOST, TestWithEmbeddedBroker.BROKER_PORT ); | |
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( brokerURL ); | |
Connection connection = connectionFactory.createConnection(); | |
connection.start(); | |
Session session = connection.createSession( true, Session.SESSION_TRANSACTED ); | |
Queue dest = session.createQueue( queueName ); | |
MessageProducer producer = session.createProducer( dest ); | |
String text = "Hello world!"; | |
Message message = session.createTextMessage(text); | |
System.out.printf( "Sending message ...\n" ); | |
producer.send( message ); | |
session.commit(); | |
ActiveMQMessageConsumer messageConsumer = (ActiveMQMessageConsumer)session.createConsumer(dest); | |
AtomicInteger messageAvailableEventsDelivered = new AtomicInteger(0); | |
messageConsumer.setAvailableListener(new MessageAvailableListener() { | |
@Override | |
public void onMessageAvailable(MessageConsumer messageConsumer) { | |
System.out.printf("GOT A MESSAGE AVAILABLE\n"); | |
messageAvailableEventsDelivered.getAndIncrement(); | |
} | |
}); | |
int nrReceiveAndRollbacks = 5; | |
for (int i = 0; i < nrReceiveAndRollbacks; i++) { | |
message = messageConsumer.receive(); | |
System.out.printf("Received a message!\n"); | |
session.rollback(); | |
} | |
assertEquals(nrReceiveAndRollbacks, messageAvailableEventsDelivered.get()); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment