Skip to content

Instantly share code, notes, and snippets.

@adamrabung
Created January 23, 2012 20:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save adamrabung/1665378 to your computer and use it in GitHub Desktop.
Save adamrabung/1665378 to your computer and use it in GitHub Desktop.
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Address;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitTest {
public static void publish(long waitForCallbacks, String exchange, String routingKey, String msg, Address brokerAddress) throws IOException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
Connection conn = factory.newConnection(new Address[] { brokerAddress });
Channel ch = conn.createChannel();
ch.confirmSelect();
final AtomicInteger callbacks = new AtomicInteger();
ch.addConfirmListener(new ConfirmListener() {
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
callbacks.incrementAndGet();
}
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
callbacks.incrementAndGet();
}
});
int PERSISTENT_MESSAGE = 2;
for (int i = 0; i < 500; i++) {
ch.basicPublish(exchange, routingKey, true, false, new AMQP.BasicProperties.Builder().deliveryMode(PERSISTENT_MESSAGE).build(), msg.getBytes());
}
Thread.sleep(waitForCallbacks); //wait for some of the callbacks to come in
long startClose = System.currentTimeMillis();
ch.close();
System.out.println(callbacks.get() + " callbacks, waited " + waitForCallbacks + "ms before close, close took " + (System.currentTimeMillis() - startClose));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment