Skip to content

Instantly share code, notes, and snippets.

@mabn
Created October 16, 2014 10:57
Show Gist options
  • Save mabn/4178b9218d9be33d495e to your computer and use it in GitHub Desktop.
Save mabn/4178b9218d9be33d495e to your computer and use it in GitHub Desktop.
final boolean doReconnect() {
Exception failure = null;
synchronized (reconnectMutex) {
// First ensure we are up to date.
doUpdateURIsFromDisk();
if (disposed || connectionFailure != null) {
reconnectMutex.notifyAll();
}
if ((connectedTransport.get() != null && !doRebalance && !priorityBackupAvailable) || disposed || connectionFailure != null) {
return false;
} else {
List<URI> connectList = getConnectList();
if (connectList.isEmpty()) {
failure = new IOException("No uris available to connect to.");
} else {
if (doRebalance) {
if (connectedToPriority || compareURIs(connectList.get(0), connectedTransportURI)) {
// already connected to first in the list, no need to rebalance
doRebalance = false;
return false;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList);
}
try {
Transport transport = this.connectedTransport.getAndSet(null);
if (transport != null) {
disposeTransport(transport);
}
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Caught an exception stopping existing transport for rebalance", e);
}
}
}
doRebalance = false;
}
resetReconnectDelay();
Transport transport = null;
URI uri = null;
// If we have a backup already waiting lets try it.
synchronized (backupMutex) {
if ((priorityBackup || backup) && !backups.isEmpty()) {
ArrayList<BackupTransport> l = new ArrayList<BackupTransport>(backups);
if (randomize) {
Collections.shuffle(l);
}
BackupTransport bt = l.remove(0);
backups.remove(bt);
transport = bt.getTransport();
uri = bt.getUri();
if (priorityBackup && priorityBackupAvailable) {
Transport old = this.connectedTransport.getAndSet(null);
if (old != null) {
disposeTransport(old);
}
priorityBackupAvailable = false;
}
}
}
// Sleep for the reconnectDelay if there's no backup and we aren't trying
// for the first time, or we were disposed for some reason.
if (transport == null && !firstConnection && (reconnectDelay > 0) && !disposed) {
synchronized (sleepMutex) {
if (LOG.isDebugEnabled()) {
LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
}
try {
sleepMutex.wait(reconnectDelay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Iterator<URI> iter = connectList.iterator();
while ((transport != null || iter.hasNext()) && (connectedTransport.get() == null && !disposed)) {
try {
SslContext.setCurrentSslContext(brokerSslContext);
// We could be starting with a backup and if so we wait to grab a
// URI from the pool until next time around.
if (transport == null) {
uri = addExtraQueryOptions(iter.next());
transport = TransportFactory.compositeConnect(uri);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Attempting " + connectFailures + "th connect to: " + uri);
}
transport.setTransportListener(myTransportListener);
transport.start();
if (started && !firstConnection) {
restoreTransport(transport);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Connection established");
}
reconnectDelay = initialReconnectDelay;
connectedTransportURI = uri;
connectedTransport.set(transport);
connectedToPriority = isPriority(connectedTransportURI);
reconnectMutex.notifyAll();
connectFailures = 0;
// Make sure on initial startup, that the transportListener
// has been initialized for this instance.
synchronized (listenerMutex) {
if (transportListener == null) {
try {
// if it isn't set after 2secs - it probably never will be
listenerMutex.wait(2000);
} catch (InterruptedException ex) {
}
}
}
if (transportListener != null) {
transportListener.transportResumed();
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("transport resumed by transport listener not set");
}
}
if (firstConnection) {
firstConnection = false;
LOG.info("Successfully connected to " + uri);
} else {
LOG.info("Successfully reconnected to " + uri);
}
connected = true;
return false;
} catch (Exception e) {
failure = e;
if (LOG.isDebugEnabled()) {
LOG.debug("Connect fail to: " + uri + ", reason: " + e);
}
if (transport != null) {
try {
transport.stop();
transport = null;
} catch (Exception ee) {
if (LOG.isDebugEnabled()) {
LOG.debug("Stop of failed transport: " + transport +
" failed with reason: " + ee);
}
}
}
} finally {
SslContext.setCurrentSslContext(null);
}
}
}
}
int reconnectLimit = calculateReconnectAttemptLimit();
connectFailures++;
if (reconnectLimit != INFINITE && connectFailures >= reconnectLimit) {
LOG.error("Failed to connect to " + uris + " after: " + connectFailures + " attempt(s)");
connectionFailure = failure;
// Make sure on initial startup, that the transportListener has been
// initialized for this instance.
synchronized (listenerMutex) {
if (transportListener == null) {
try {
listenerMutex.wait(2000);
} catch (InterruptedException ex) {
}
}
}
propagateFailureToExceptionListener(connectionFailure);
return false;
}
int warnInterval = getWarnAfterReconnectAttempts();
if (warnInterval > 0 && (connectFailures % warnInterval) == 0) {
LOG.warn("Failed to connect to {} after: {} attempt(s) continuing to retry.",
uris, connectFailures);
}
}
if (!disposed) {
doDelay();
}
return !disposed;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment