Skip to content

Instantly share code, notes, and snippets.

@hgschmie
Created December 10, 2010 00:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hgschmie/735573 to your computer and use it in GitHub Desktop.
Save hgschmie/735573 to your computer and use it in GitHub Desktop.
public class ActiveMQReader implements Runnable {
private Connection connection = null;
private Session session = null;
private MessageConsumer consumer = null;
private boolean running = true;
private long lastEvent = 0L;
private long lastHit = 0L;
public ActiveMQReader() {
}
@Override
public void run() {
lastEvent = lastHit = System.nanoTime();
while (running) {
try {
try {
if (connection == null) {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session.createConsumer(session.createTopic(topic));
backoff = 1;
}
final Message message = consumer.receive(TICK_TIMEOUT);
if (message == null) {
log.info("Did not receive a message within Ticker interval. Tracker down?");
}
else if (message instanceof TextMessage) {
final String msg = ((TextMessage) message).getText();
processMessage(msg);
}
} catch (JMSException je) {
log.warnDebug(je, "Could not connect to Broker, sleeping for %d seconds", backoff * 3);
Thread.sleep(3000L * backoff);
if (backoff != 1<<6) {
backoff <<= 1;
}
closeJMSConnection();
}
} catch (InterruptedException ie) {
running = false;
Thread.interrupted();
}
}
closeJMSConnection();
}
private void closeJMSConnection() {
closeQuietly(consumer);
closeQuietly(session);
closeQuietly(connection);
connection = null;
consumer = null;
session = null;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment