Created December 6, 2016 06:11
Autoreconnect rabbitmq
import com.rabbitmq.client.*;
import com.rabbitmq.utility.Utility;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
* Created by dan on 2016/10/19.
public class BlockableQueueConsumer extends DefaultConsumer {
private final BlockingQueue<QueueingConsumer.Delivery> _queue;
// When this is non-null the queue is in shutdown mode and nextDelivery should
// throw a shutdown signal exception.
private volatile ShutdownSignalException _shutdown;
private volatile ConsumerCancelledException _cancelled;
// Marker object used to signal the queue is in shutdown mode.
// It is only there to wake up consumers. The canonical representation
// of shutting down is the presence of _shutdown.
// Invariant: This is never on _queue unless _shutdown != null.
private static final QueueingConsumer.Delivery POISON = new QueueingConsumer.Delivery(null, null, null);
public BlockableQueueConsumer(Channel ch) {
this(ch, new LinkedBlockingQueue<QueueingConsumer.Delivery>());
public BlockableQueueConsumer(Channel ch, BlockingQueue<QueueingConsumer.Delivery> q) {
this._queue = q;
@Override public void handleShutdownSignal(String consumerTag,
ShutdownSignalException sig) {
_shutdown = sig;
@Override public void handleCancel(String consumerTag) throws IOException {
_cancelled = new ConsumerCancelledException();
@Override public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
if (this.getChannel().isOpen()) {
this._queue.add(new QueueingConsumer.Delivery(envelope, properties, body));
* If delivery is not POISON nor null, return it.
* <p/>
* If delivery, _shutdown and _cancelled are all null, return null.
* <p/>
* If delivery is POISON re-insert POISON into the queue and
* throw an exception if POISONed for no reason.
* <p/>
* Otherwise, if we are in shutdown mode or cancelled,
* throw a corresponding exception.
private QueueingConsumer.Delivery handle(QueueingConsumer.Delivery delivery) {
if (delivery == POISON ||
delivery == null && (_shutdown != null || _cancelled != null)) {
if (delivery == POISON) {
if (_shutdown == null && _cancelled == null) {
throw new IllegalStateException(
"POISON in queue, but null _shutdown and null _cancelled. " +
"This should never happen, please report as a BUG");
if (null != _shutdown) {
throw Utility.fixStackTrace(_shutdown);
if (null != _cancelled)
throw Utility.fixStackTrace(_cancelled);
return delivery;
* Main application-side API: wait for the next message delivery and return it.
* @return the next message
* @throws InterruptedException if an interrupt is received while waiting
* @throws ShutdownSignalException if the connection is shut down while waiting
* @throws ConsumerCancelledException if this consumer is cancelled while waiting
public QueueingConsumer.Delivery nextDelivery()
throws InterruptedException, ShutdownSignalException, ConsumerCancelledException
return handle(_queue.take());
* Main application-side API: wait for the next message delivery and return it.
* @param timeout timeout in millisecond
* @return the next message or null if timed out
* @throws InterruptedException if an interrupt is received while waiting
* @throws ShutdownSignalException if the connection is shut down while waiting
* @throws ConsumerCancelledException if this consumer is cancelled while waiting
public QueueingConsumer.Delivery nextDelivery(long timeout)
throws InterruptedException, ShutdownSignalException, ConsumerCancelledException
return handle(_queue.poll(timeout, TimeUnit.MILLISECONDS));
import com.rabbitmq.client.*;
import java.util.*;
* 采用 lyra的组件, 实现了auto reconnect
* 其实不用任何组件,只要修改 QueuingConsumer这个类, 就可以实现 auto reconnect
public class Consumer {
private static final String QUEUE_NAME = "10bei-test-local4";
private static Address[] getAddresses(String host) {
String[] strs = host.split(",");
List<Address> addresses = new ArrayList<>();
for(String str: strs) {
addresses.add(new Address(str.trim()));
return addresses.toArray(new Address[0]);
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection(getAddresses(""));
final Channel channel = connection.createChannel();
String queueName = QUEUE_NAME;
channel.queueDeclare(queueName, true, false, true, null);
Producer.XT xt = Producer.XT.HEADERS;
switch (xt) {
case FANOUT:
channel.exchangeDeclare(Producer.XCHG_NAME, "fanout", true, true, null);
//channel.exchangeDeclarePassive() 可以使用该函数使用一个已经建立的exchange
channel.queueBind(queueName, Producer.XCHG_NAME, "");
case DIRECT:
channel.exchangeDeclare(Producer.XCHG_NAME, "direct", true, true, null);
channel.queueBind(queueName, Producer.XCHG_NAME, "info"); //绑定一个routing key,可以绑定多个
channel.queueBind(queueName, Producer.XCHG_NAME, "warning");
case TOPIC:
channel.exchangeDeclare(Producer.XCHG_NAME, "topic", true, true, null);
channel.queueBind(queueName, Producer.XCHG_NAME, "warning.#"); //监听两种模式 #匹配一个或多个单词 *匹配一个单词
channel.queueBind(queueName, Producer.XCHG_NAME, "*.blue");
channel.exchangeDeclare(Producer.XCHG_NAME, "headers", true, false, null);
Map<String, Object> headers = new HashMap<String, Object>() {{
put("name", "test");
put("sex", "male");
put("x-match", "any");//all==匹配所有条件,any==匹配任意条件
channel.queueBind(queueName, Producer.XCHG_NAME, Producer.ROUTING_KEY, headers);
channel.basicQos(1); //server push消息时的队列长度
BlockableQueueConsumer consumer = new BlockableQueueConsumer(channel);
// 指定接收者,第二个参数为自动应答,无需手动应答
channel.basicConsume(QUEUE_NAME, false, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
