Skip to content

Instantly share code, notes, and snippets.

@fieldju
Created November 8, 2012 17:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fieldju/4040309 to your computer and use it in GitHub Desktop.
Save fieldju/4040309 to your computer and use it in GitHub Desktop.
ActiveMQBrokerMonitorThread
package cnwk.foreman.ingestion;
import cnwk.foreman.util.LangUtils;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.ServiceStopper;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import java.net.InetAddress;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Created by Justin Field | justin.field@cbsinteractive.com on 10/2/12
* Description: simple thread for handling the starting of the active mq
* broker so that it does not block foreman when it gets blocked after not getting
* the lock on the DB
*/
public class ActiveMQBrokerMonitorThread implements Runnable {
private static final Logger LOG = Logger.getLogger(ActiveMQBrokerMonitorThread.class);
private static final long ONE_MINUTE = 60000;
private AtomicBoolean running = new AtomicBoolean(true);
private BrokerService broker;
private ParallelIngestionService parallelIngestionService;
private BrokerFactory brokerFactory;
public Boolean isRunning() {
return running.get();
}
@SuppressWarnings({"UnusedDeclaration"})
public void setRunning(final Boolean running) {
this.running.set(running);
}
public ActiveMQBrokerMonitorThread(final BrokerFactory brokerFactory, final ParallelIngestionService parallelIngestionService) {
this.brokerFactory = brokerFactory;
this.parallelIngestionService = parallelIngestionService;
}
public void run() {
while (isRunning()) {
checkAndStartBroker();
try {
if (! broker.isStarted())
broker.waitUntilStarted();
} catch (Exception e) {
LOG.error("failed to wait for broker to start", e);
}
try {
Thread.sleep(ONE_MINUTE);
} catch (Exception e) {
LOG.error("sleep interrupted", e);
}
verifyMasterLock();
}
}
/**
* checks to see if the broker is started
* if not it tries to start it
*/
public void checkAndStartBroker() {
if (broker == null || ! broker.isStarted()) {
try {
broker = getNewBroker();
broker.start();
} catch (Exception e) {
LOG.error("Broker failed to start", e);
}
}
}
/**
* queries the DB to verify who has the master lock
* if its another broker and this is not a slave then it shuts down the broker
*/
public void verifyMasterLock() {
String currentLockHolder = parallelIngestionService.getActiveMQLockHolder();
String hostname = null;
try {
hostname = InetAddress.getLocalHost().getHostName();
} catch (Exception e) {
LOG.error("failed to get hostname", e);
}
// if the broker is not a slave and is not in the db as the master
// then we have to masters and this is bad
if ( ! StringUtils.equals(currentLockHolder, hostname) && ! broker.isSlave()) {
LOG.error("Broker: " + LangUtils.ifNull(broker.getBrokerName(), "null") +
" thinks its a master when the DB says Broker: " + currentLockHolder +
" is the master, restarting broker");
try {
broker.stopAllConnectors(new ServiceStopper());
broker.stop();
broker.waitUntilStopped();
} catch (Exception e) {
LOG.error("failed to stop broker", e);
}
}
}
// Please note that this is a custom brokerFactory and not the one that is included with ActiveMQ
private BrokerService getNewBroker() {
return brokerFactory.getBean();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment