Created
October 16, 2014 10:57
-
-
Save mabn/4178b9218d9be33d495e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
final boolean doReconnect() { | |
Exception failure = null; | |
synchronized (reconnectMutex) { | |
// First ensure we are up to date. | |
doUpdateURIsFromDisk(); | |
if (disposed || connectionFailure != null) { | |
reconnectMutex.notifyAll(); | |
} | |
if ((connectedTransport.get() != null && !doRebalance && !priorityBackupAvailable) || disposed || connectionFailure != null) { | |
return false; | |
} else { | |
List<URI> connectList = getConnectList(); | |
if (connectList.isEmpty()) { | |
failure = new IOException("No uris available to connect to."); | |
} else { | |
if (doRebalance) { | |
if (connectedToPriority || compareURIs(connectList.get(0), connectedTransportURI)) { | |
// already connected to first in the list, no need to rebalance | |
doRebalance = false; | |
return false; | |
} else { | |
if (LOG.isDebugEnabled()) { | |
LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList); | |
} | |
try { | |
Transport transport = this.connectedTransport.getAndSet(null); | |
if (transport != null) { | |
disposeTransport(transport); | |
} | |
} catch (Exception e) { | |
if (LOG.isDebugEnabled()) { | |
LOG.debug("Caught an exception stopping existing transport for rebalance", e); | |
} | |
} | |
} | |
doRebalance = false; | |
} | |
resetReconnectDelay(); | |
Transport transport = null; | |
URI uri = null; | |
// If we have a backup already waiting lets try it. | |
synchronized (backupMutex) { | |
if ((priorityBackup || backup) && !backups.isEmpty()) { | |
ArrayList<BackupTransport> l = new ArrayList<BackupTransport>(backups); | |
if (randomize) { | |
Collections.shuffle(l); | |
} | |
BackupTransport bt = l.remove(0); | |
backups.remove(bt); | |
transport = bt.getTransport(); | |
uri = bt.getUri(); | |
if (priorityBackup && priorityBackupAvailable) { | |
Transport old = this.connectedTransport.getAndSet(null); | |
if (old != null) { | |
disposeTransport(old); | |
} | |
priorityBackupAvailable = false; | |
} | |
} | |
} | |
// Sleep for the reconnectDelay if there's no backup and we aren't trying | |
// for the first time, or we were disposed for some reason. | |
if (transport == null && !firstConnection && (reconnectDelay > 0) && !disposed) { | |
synchronized (sleepMutex) { | |
if (LOG.isDebugEnabled()) { | |
LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. "); | |
} | |
try { | |
sleepMutex.wait(reconnectDelay); | |
} catch (InterruptedException e) { | |
Thread.currentThread().interrupt(); | |
} | |
} | |
} | |
Iterator<URI> iter = connectList.iterator(); | |
while ((transport != null || iter.hasNext()) && (connectedTransport.get() == null && !disposed)) { | |
try { | |
SslContext.setCurrentSslContext(brokerSslContext); | |
// We could be starting with a backup and if so we wait to grab a | |
// URI from the pool until next time around. | |
if (transport == null) { | |
uri = addExtraQueryOptions(iter.next()); | |
transport = TransportFactory.compositeConnect(uri); | |
} | |
if (LOG.isDebugEnabled()) { | |
LOG.debug("Attempting " + connectFailures + "th connect to: " + uri); | |
} | |
transport.setTransportListener(myTransportListener); | |
transport.start(); | |
if (started && !firstConnection) { | |
restoreTransport(transport); | |
} | |
if (LOG.isDebugEnabled()) { | |
LOG.debug("Connection established"); | |
} | |
reconnectDelay = initialReconnectDelay; | |
connectedTransportURI = uri; | |
connectedTransport.set(transport); | |
connectedToPriority = isPriority(connectedTransportURI); | |
reconnectMutex.notifyAll(); | |
connectFailures = 0; | |
// Make sure on initial startup, that the transportListener | |
// has been initialized for this instance. | |
synchronized (listenerMutex) { | |
if (transportListener == null) { | |
try { | |
// if it isn't set after 2secs - it probably never will be | |
listenerMutex.wait(2000); | |
} catch (InterruptedException ex) { | |
} | |
} | |
} | |
if (transportListener != null) { | |
transportListener.transportResumed(); | |
} else { | |
if (LOG.isDebugEnabled()) { | |
LOG.debug("transport resumed by transport listener not set"); | |
} | |
} | |
if (firstConnection) { | |
firstConnection = false; | |
LOG.info("Successfully connected to " + uri); | |
} else { | |
LOG.info("Successfully reconnected to " + uri); | |
} | |
connected = true; | |
return false; | |
} catch (Exception e) { | |
failure = e; | |
if (LOG.isDebugEnabled()) { | |
LOG.debug("Connect fail to: " + uri + ", reason: " + e); | |
} | |
if (transport != null) { | |
try { | |
transport.stop(); | |
transport = null; | |
} catch (Exception ee) { | |
if (LOG.isDebugEnabled()) { | |
LOG.debug("Stop of failed transport: " + transport + | |
" failed with reason: " + ee); | |
} | |
} | |
} | |
} finally { | |
SslContext.setCurrentSslContext(null); | |
} | |
} | |
} | |
} | |
int reconnectLimit = calculateReconnectAttemptLimit(); | |
connectFailures++; | |
if (reconnectLimit != INFINITE && connectFailures >= reconnectLimit) { | |
LOG.error("Failed to connect to " + uris + " after: " + connectFailures + " attempt(s)"); | |
connectionFailure = failure; | |
// Make sure on initial startup, that the transportListener has been | |
// initialized for this instance. | |
synchronized (listenerMutex) { | |
if (transportListener == null) { | |
try { | |
listenerMutex.wait(2000); | |
} catch (InterruptedException ex) { | |
} | |
} | |
} | |
propagateFailureToExceptionListener(connectionFailure); | |
return false; | |
} | |
int warnInterval = getWarnAfterReconnectAttempts(); | |
if (warnInterval > 0 && (connectFailures % warnInterval) == 0) { | |
LOG.warn("Failed to connect to {} after: {} attempt(s) continuing to retry.", | |
uris, connectFailures); | |
} | |
} | |
if (!disposed) { | |
doDelay(); | |
} | |
return !disposed; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment