Skip to content

Instantly share code, notes, and snippets.

@burtonator
Created June 10, 2016 01:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save burtonator/ebf06a7238bd9a1273853ce5282acf02 to your computer and use it in GitHub Desktop.
Save burtonator/ebf06a7238bd9a1273853ce5282acf02 to your computer and use it in GitHub Desktop.
String brokerURL = String.format( "tcp://%s:%s?jms.prefetchPolicy.all=1", TestWithEmbeddedBroker.BROKER_HOST, TestWithEmbeddedBroker.BROKER_PORT );
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( brokerURL );
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession( true, Session.SESSION_TRANSACTED );
Queue dest = session.createQueue( queueName );
MessageProducer producer = session.createProducer( dest );
String text = "Hello world!";
Message message = session.createTextMessage(text);
System.out.printf( "Sending message ...\n" );
producer.send( message );
session.commit();
ActiveMQMessageConsumer messageConsumer = (ActiveMQMessageConsumer)session.createConsumer(dest);
AtomicInteger messageAvailableEventsDelivered = new AtomicInteger(0);
messageConsumer.setAvailableListener(new MessageAvailableListener() {
@Override
public void onMessageAvailable(MessageConsumer messageConsumer) {
System.out.printf("GOT A MESSAGE AVAILABLE\n");
messageAvailableEventsDelivered.getAndIncrement();
}
});
int nrReceiveAndRollbacks = 5;
for (int i = 0; i < nrReceiveAndRollbacks; i++) {
message = messageConsumer.receive();
System.out.printf("Received a message!\n");
session.rollback();
}
assertEquals(nrReceiveAndRollbacks, messageAvailableEventsDelivered.get());
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment