Skip to content

Instantly share code, notes, and snippets.

@srikanthps
Created September 27, 2012 10:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save srikanthps/3793419 to your computer and use it in GitHub Desktop.
Save srikanthps/3793419 to your computer and use it in GitHub Desktop.
Try this program - Not able to make this one work
package org.trpr.platform.integration.messaging.test;
import java.util.List;
import org.trpr.platform.integration.impl.messaging.RabbitMQConfiguration;
import org.trpr.platform.integration.impl.messaging.RabbitMQMessageConsumerImpl;
import org.trpr.platform.integration.impl.messaging.RabbitMQMessagePublisherImpl;
import org.trpr.platform.integration.spi.messaging.MessagingException;
/**
* Test class for org.trpr.platform.integration.messaging.rabbitmq.RabbitMQMessagePublisherImpl
*
* @author Regunath B
*/
public class RabbitMQMessagePublisherTest {
/**
* Test method
*/
public static void main(String[] args) {
List<RabbitMQConfiguration> rabbitMQConfigurations = new java.util.LinkedList<RabbitMQConfiguration>();
RabbitMQConfiguration conf = new RabbitMQConfiguration();
conf.setHostName("localhost");
conf.setVirtualHost("/");
conf.setExchangeName("enrol");
conf.setExchangeType("direct");
conf.setDurable(true);
conf.setPortNumber(5672);
conf.setUserName("guest");
conf.setPassword("guest");
conf.setQueueName("helloInput");
conf.setRoutingKey("helloInput");
conf.setDurableMessageCommitCount(100);
rabbitMQConfigurations.add(conf);
final RabbitMQMessagePublisherImpl publisher = new RabbitMQMessagePublisherImpl();
publisher.setRabbitMQConfigurations(rabbitMQConfigurations);
final RabbitMQMessageConsumerImpl consumer = new RabbitMQMessageConsumerImpl();
consumer.setRabbitMQConfigurations(rabbitMQConfigurations);
//Consume any messages that are already in the queue
while(consumer.getQueueDepth() > 1) {
System.out.println(consumer.getQueueDepth() + " messages still in queue");
System.out.println("Consuming : " + consumer.consumeString());
}
// Publish and consume 100 messages
for (int i = 0; i < 100; i++) {
publisher.publishString("Message # " + i);
System.out.println(consumer.consumeString());
}
// finally close connections
publisher.closeConnections();
consumer.closeConnections();
}
}
@regunathb
Copy link

The issue is with the commit count in the configuration that has been set to 100. Consumer does not see durable messages until they are committed.

@regunathb
Copy link

Changed code:

package org.trpr.platform.integration.messaging.test;

import java.util.List;

import org.trpr.platform.integration.impl.messaging.RabbitMQConfiguration;
import org.trpr.platform.integration.impl.messaging.RabbitMQMessageConsumerImpl;
import org.trpr.platform.integration.impl.messaging.RabbitMQMessagePublisherImpl;
import org.trpr.platform.integration.spi.messaging.MessagingException;

/**

  • Test class for org.trpr.platform.integration.messaging.rabbitmq.RabbitMQMessagePublisherImpl

  • @author Regunath B
    */
    public class RabbitMQMessagePublisherTest {

    /**

    • Test method
      */
      public static void main(String[] args) {

      List rabbitMQConfigurations = new java.util.LinkedList();
      RabbitMQConfiguration conf = new RabbitMQConfiguration();
      conf.setHostName("localhost");
      conf.setVirtualHost("/");
      conf.setExchangeName("enrol");
      conf.setExchangeType("direct");
      conf.setDurable(true);
      conf.setPortNumber(5672);
      conf.setUserName("guest");
      conf.setPassword("guest");
      conf.setQueueName("helloInput");
      conf.setRoutingKey("helloInput");
      rabbitMQConfigurations.add(conf);

      final RabbitMQMessagePublisherImpl publisher = new RabbitMQMessagePublisherImpl();
      publisher.setRabbitMQConfigurations(rabbitMQConfigurations);

      final RabbitMQMessageConsumerImpl consumer = new RabbitMQMessageConsumerImpl();
      consumer.setRabbitMQConfigurations(rabbitMQConfigurations);

      //Consume any messages that are already in the queue
      while(consumer.getQueueDepth() > 1) {
      System.out.println(consumer.getQueueDepth() + " messages still in queue");
      System.out.println("Consuming : " + consumer.consumeString());
      }

      // Publish and consume 100 messages
      for (int i = 0; i < 10; i++) {
      publisher.publishString("Message # " + i);
      System.out.println("Consuming : " + consumer.consumeString());
      }

      // finally close connections
      publisher.closeConnections();
      consumer.closeConnections();

    }

}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment