Skip to content

Instantly share code, notes, and snippets.

@mlui
Created February 8, 2011 23:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save mlui/817505 to your computer and use it in GitHub Desktop.
Save mlui/817505 to your computer and use it in GitHub Desktop.
/*
* Copyright 2002-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.shopzilla.amqp.core;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.rabbit.connection.SingleConnectionFactory;
import org.springframework.beans.factory.annotation.Required;
import java.io.IOException;
/**
* @author Mark Lui
* @since Jan 5, 2011
*/
public class AutoRetryConnectionFactory extends SingleConnectionFactory {
private static final Log LOG = LogFactory.getLog(AutoRetryConnectionFactory.class);
private long retryDelay;
private AutoRetryMessageListenerContainer container;
public AutoRetryConnectionFactory(String hostName) {
super(hostName);
}
@Override
protected void prepareConnection(Connection con) throws IOException {
//Add a listener whenever the connection to RabbitMQ breaks
con.addShutdownListener(new AutoRetryShutdownListener(container, retryDelay, this));
}
static class AutoRetryShutdownListener implements ShutdownListener {
private long retryDelay;
private AutoRetryMessageListenerContainer container;
private SingleConnectionFactory connectionFactory;
public AutoRetryShutdownListener(AutoRetryMessageListenerContainer container, long retryDelay,
SingleConnectionFactory connectionFactory) {
this.container = container;
this.retryDelay = retryDelay;
this.connectionFactory = connectionFactory;
}
public void shutdownCompleted(ShutdownSignalException cause) {
//Need to check reason to determine if reconnection logic should run
String exceptionMessage = cause.getMessage();
//If clean connection shutdown do not run reconnection code
if (exceptionMessage.indexOf("clean connection shutdown") < 0) {
boolean containerDown = true;
while (containerDown) {
try {
//Container must be shutdown to allow a restart
if (container != null) {
container.shutdown();
container.start();
} else {
connectionFactory.resetConnection();
}
containerDown = false;
} catch (Throwable ex) {
LOG.warn(String.format("Problem connecting with RabbitMQ waiting %d ms", retryDelay));
try {
Thread.sleep(retryDelay);
} catch (InterruptedException e) {
//do nothing
}
}
}
}
}
}
public void setContainer(AutoRetryMessageListenerContainer container) {
this.container = container;
}
@Required
public void setRetryDelay(long retryDelay) {
this.retryDelay = retryDelay;
}
}
/*
* Copyright 2002-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.shopzilla.amqp.core;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.SingleConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.util.ReflectionUtils;
import java.lang.reflect.Field;
import java.util.LinkedList;
import java.util.List;
/**
* @author Mark Lui
* @since Jan 5, 2011
*/
public class AutoRetryMessageListenerContainer extends SimpleMessageListenerContainer {
private static final Log LOG = LogFactory.getLog(AutoRetryMessageListenerContainer.class);
private long retryDelay;
private List<Queue> targetQueue;
private RabbitAdmin amqpAdmin;
public AutoRetryMessageListenerContainer() {
super();
}
public AutoRetryMessageListenerContainer(AutoRetryConnectionFactory connectionFactory) {
super(connectionFactory);
connectionFactory.setContainer(this);
}
/**
* Capture exceptions from property set if RabbitMQ is down on startup. Will retry until reconnect is possible.
*/
@Override
public void afterPropertiesSet() {
boolean started = false;
while (!started) {
try {
super.afterPropertiesSet();
for(Queue queue: targetQueue) {
amqpAdmin.declareQueue(queue);
}
started = true;
} catch (Throwable ex) {
LOG.warn(String.format("Problem connecting with RabbitMQ waiting %d ms", retryDelay));
try {
Thread.sleep(retryDelay);
} catch (InterruptedException e) {
//do nothing
}
}
}
}
/**
* Augmented shutdown code to fix issues with current RabbitMQ container.
* This may be unnecessary in later versions of Spring AMQP support after 1.0.0.M1
*/
@Override
public void shutdown() {
((SingleConnectionFactory)getConnectionFactory()).resetConnection();
super.shutdown();
try {
Field consumersField = ReflectionUtils.findField(this.getClass(), "consumers");
ReflectionUtils.makeAccessible(consumersField);
consumersField.set(this, null);
Field channelsField = ReflectionUtils.findField(this.getClass(), "channels");
ReflectionUtils.makeAccessible(channelsField);
channelsField.set(this, null);
} catch (IllegalAccessException ex) {
throw new RuntimeException(ex);
}
}
@Required
public void setRetryDelay(long retryDelay) {
this.retryDelay = retryDelay;
}
public void setTargetQueue(Queue queue) {
if(targetQueue == null) {
targetQueue = new LinkedList<Queue>();
}
targetQueue.add(queue);
}
public void setTargetQueueList(List<Queue> queues) {
targetQueue = queues;
}
@Required
public void setAmqpAdmin(RabbitAdmin amqpAdmin) {
this.amqpAdmin = amqpAdmin;
}
/**
* Wrapper final method isRunning to allow for unit testing
* @return boolean is container is running
*/
public boolean isEnabled() {
return super.isRunning();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment