Skip to content

Instantly share code, notes, and snippets.

@clevertension
Created December 6, 2016 06:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save clevertension/28f1af6bf28a7c474efc1bacf15d0147 to your computer and use it in GitHub Desktop.
Save clevertension/28f1af6bf28a7c474efc1bacf15d0147 to your computer and use it in GitHub Desktop.
Autoreconnect rabbitmq
import com.rabbitmq.client.*;
import com.rabbitmq.utility.Utility;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* Created by dan on 2016/10/19.
*/
public class BlockableQueueConsumer extends DefaultConsumer {
private final BlockingQueue<QueueingConsumer.Delivery> _queue;
// When this is non-null the queue is in shutdown mode and nextDelivery should
// throw a shutdown signal exception.
private volatile ShutdownSignalException _shutdown;
private volatile ConsumerCancelledException _cancelled;
// Marker object used to signal the queue is in shutdown mode.
// It is only there to wake up consumers. The canonical representation
// of shutting down is the presence of _shutdown.
// Invariant: This is never on _queue unless _shutdown != null.
private static final QueueingConsumer.Delivery POISON = new QueueingConsumer.Delivery(null, null, null);
public BlockableQueueConsumer(Channel ch) {
this(ch, new LinkedBlockingQueue<QueueingConsumer.Delivery>());
}
public BlockableQueueConsumer(Channel ch, BlockingQueue<QueueingConsumer.Delivery> q) {
super(ch);
this._queue = q;
}
@Override public void handleShutdownSignal(String consumerTag,
ShutdownSignalException sig) {
_shutdown = sig;
}
@Override public void handleCancel(String consumerTag) throws IOException {
_cancelled = new ConsumerCancelledException();
_queue.add(POISON);
}
@Override public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
if (this.getChannel().isOpen()) {
this._queue.add(new QueueingConsumer.Delivery(envelope, properties, body));
}
}
/**
* If delivery is not POISON nor null, return it.
* <p/>
* If delivery, _shutdown and _cancelled are all null, return null.
* <p/>
* If delivery is POISON re-insert POISON into the queue and
* throw an exception if POISONed for no reason.
* <p/>
* Otherwise, if we are in shutdown mode or cancelled,
* throw a corresponding exception.
*/
private QueueingConsumer.Delivery handle(QueueingConsumer.Delivery delivery) {
if (delivery == POISON ||
delivery == null && (_shutdown != null || _cancelled != null)) {
if (delivery == POISON) {
_queue.add(POISON);
if (_shutdown == null && _cancelled == null) {
throw new IllegalStateException(
"POISON in queue, but null _shutdown and null _cancelled. " +
"This should never happen, please report as a BUG");
}
}
if (null != _shutdown) {
throw Utility.fixStackTrace(_shutdown);
}
if (null != _cancelled)
throw Utility.fixStackTrace(_cancelled);
}
return delivery;
}
/**
* Main application-side API: wait for the next message delivery and return it.
* @return the next message
* @throws InterruptedException if an interrupt is received while waiting
* @throws ShutdownSignalException if the connection is shut down while waiting
* @throws ConsumerCancelledException if this consumer is cancelled while waiting
*/
public QueueingConsumer.Delivery nextDelivery()
throws InterruptedException, ShutdownSignalException, ConsumerCancelledException
{
return handle(_queue.take());
}
/**
* Main application-side API: wait for the next message delivery and return it.
* @param timeout timeout in millisecond
* @return the next message or null if timed out
* @throws InterruptedException if an interrupt is received while waiting
* @throws ShutdownSignalException if the connection is shut down while waiting
* @throws ConsumerCancelledException if this consumer is cancelled while waiting
*/
public QueueingConsumer.Delivery nextDelivery(long timeout)
throws InterruptedException, ShutdownSignalException, ConsumerCancelledException
{
return handle(_queue.poll(timeout, TimeUnit.MILLISECONDS));
}
}
import com.rabbitmq.client.*;
import java.util.*;
/**
* 采用 lyra的组件, 实现了auto reconnect
* 其实不用任何组件,只要修改 QueuingConsumer这个类, 就可以实现 auto reconnect
*/
public class Consumer {
private static final String QUEUE_NAME = "10bei-test-local4";
private static Address[] getAddresses(String host) {
String[] strs = host.split(",");
List<Address> addresses = new ArrayList<>();
for(String str: strs) {
addresses.add(new Address(str.trim()));
}
return addresses.toArray(new Address[0]);
}
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("10bei");
factory.setPassword("10bei.cn");
factory.setVirtualHost("/10bei");
factory.setRequestedHeartbeat(5);
factory.setConnectionTimeout(5000);
factory.setAutomaticRecoveryEnabled(true);
Connection connection = factory.newConnection(getAddresses("192.168.1.4"));
final Channel channel = connection.createChannel();
String queueName = QUEUE_NAME;
//队列的相关参数需要与第一次定义该队列时相同,否则会出错,使用channel.queueDeclarePassive()可只被动绑定已有队列,而不创建
channel.queueDeclare(queueName, true, false, true, null);
Producer.XT xt = Producer.XT.HEADERS;
switch (xt) {
case FANOUT:
//接收端也声明一个fanout交换机
channel.exchangeDeclare(Producer.XCHG_NAME, "fanout", true, true, null);
//channel.exchangeDeclarePassive() 可以使用该函数使用一个已经建立的exchange
//声明一个临时队列,该队列会在使用完比后自动销毁
//将队列绑定到交换机,参数3无意义此时
channel.queueBind(queueName, Producer.XCHG_NAME, "");
break;
case DIRECT:
channel.exchangeDeclare(Producer.XCHG_NAME, "direct", true, true, null);
channel.queueBind(queueName, Producer.XCHG_NAME, "info"); //绑定一个routing key,可以绑定多个
channel.queueBind(queueName, Producer.XCHG_NAME, "warning");
break;
case TOPIC:
channel.exchangeDeclare(Producer.XCHG_NAME, "topic", true, true, null);
channel.queueBind(queueName, Producer.XCHG_NAME, "warning.#"); //监听两种模式 #匹配一个或多个单词 *匹配一个单词
channel.queueBind(queueName, Producer.XCHG_NAME, "*.blue");
break;
case HEADERS:
channel.exchangeDeclare(Producer.XCHG_NAME, "headers", true, false, null);
Map<String, Object> headers = new HashMap<String, Object>() {{
put("name", "test");
put("sex", "male");
put("x-match", "any");//all==匹配所有条件,any==匹配任意条件
}};
channel.queueBind(queueName, Producer.XCHG_NAME, Producer.ROUTING_KEY, headers);
break;
}
channel.basicQos(1); //server push消息时的队列长度
BlockableQueueConsumer consumer = new BlockableQueueConsumer(channel);
// 指定接收者,第二个参数为自动应答,无需手动应答
channel.basicConsume(QUEUE_NAME, false, consumer);
while (true) {
//消息采用阻塞的方式
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment