Last active
April 3, 2016 01:51
-
-
Save scottbyrns/aa5c4657e56683272415e039a87e0488 to your computer and use it in GitHub Desktop.
Swift Connection Pool
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import C7 | |
import Venice | |
import TCP | |
public enum PoolError : ErrorProtocol { | |
case tooBusy, empty, timeout, maxErrorDurationExeceeded, maxWaitDurationExceeded, maxUnavailableDurationExceeded | |
} | |
protocol Pool { | |
associatedtype Poolable | |
func borrow () -> Poolable? | |
func takeBack (poolable: Poolable) | |
func with (handler: (poolable: Poolable) throws -> Any?) throws | |
} | |
public class ConnectionPool<PoolConnection : Connection where PoolConnection : AnyObject> : Pool { | |
var maxErrorDuration : Duration | |
var retryDelay : Duration | |
var connectionWait : Duration | |
var maxReconnectDuration : Duration | |
private var active = [PoolConnection]() | |
private var lent = [PoolConnection]() | |
private var idle = [PoolConnection]() | |
private var suspect = [PoolConnection]() | |
private var pendingRetry = [PoolConnection]() | |
private var errorDurations = [Int: Duration]() | |
private var connectionErrors = [Int: Array<PoolError>]() | |
private var disconnectedDuration = [Int: Int64]() | |
public init (using pool: [PoolConnection], toleratingErrorsFor maxErrorDuration: Duration, delayingRetryFor retryDelay: Duration, waitingForConnectionNoMoreThan connectionWait: Duration, discardingIfUnavailableForMoreThan maxReconnectDuration: Duration) { | |
idle = pool | |
self.maxErrorDuration = maxErrorDuration | |
self.retryDelay = retryDelay | |
self.connectionWait = connectionWait | |
self.maxReconnectDuration = maxReconnectDuration | |
for connection in pool { | |
errorDurations[ObjectIdentifier(connection).hashValue] = 0 | |
disconnectedDuration[ObjectIdentifier(connection).hashValue] = 0.millisecond | |
connectionErrors[ObjectIdentifier(connection).hashValue] = Array<PoolError>() | |
} | |
} | |
private func createPredicate (connection: PoolConnection) -> (PoolConnection) -> Bool { | |
return { $0 === connection } | |
} | |
public func remove(connection : PoolConnection) -> Bool { | |
let closed = connection.close() | |
if closed { | |
let predicate = createPredicate(connection) | |
if let index = active.index(where: predicate) { | |
active.remove(at: index) | |
} | |
else if let index = idle.index(where: predicate) { | |
idle.remove(at: index) | |
} | |
else if let index = lent.index(where: predicate) { | |
lent.remove(at: index) | |
} | |
errorDurations.removeValue(forKey: ObjectIdentifier(connection).hashValue) | |
disconnectedDuration.removeValue(forKey: ObjectIdentifier(connection).hashValue) | |
connectionErrors.removeValue(forKey: ObjectIdentifier(connection).hashValue) | |
} | |
return closed | |
} | |
public func borrow() -> PoolConnection? { | |
guard idle.count > 0 else { | |
return nil | |
} | |
guard let connection = idle.first else { | |
return nil | |
} | |
idle.remove(at: 0) | |
lent.append(connection) | |
return connection | |
} | |
public func takeBack (connection: PoolConnection) { | |
if let index = lent.index(where: createPredicate(connection)) { | |
lent.remove(at: index) | |
idle.append(connection) | |
} | |
} | |
private func nextIdleConnection() throws -> PoolConnection? { | |
guard let connection = idle.first else { | |
return nil | |
} | |
idle.remove(at: 0) | |
active.append(connection) | |
return connection | |
} | |
private func nextSuspectConnection() throws -> PoolConnection? { | |
guard let connection = suspect.first else { | |
return nil | |
} | |
suspect.remove(at: 0) | |
active.append(connection) | |
return connection | |
} | |
private func nextPendingConnection() throws -> PoolConnection? { | |
if let connection = pendingRetry.first { | |
pendingRetry.remove(at: 0) | |
do { | |
try connection.open() | |
} | |
catch { | |
if let index = disconnectedDuration.index(forKey: ObjectIdentifier(connection).hashValue) { | |
if now - disconnectedDuration[index].value > maxReconnectDuration { | |
remove(connection) | |
throw PoolError.maxUnavailableDurationExceeded | |
} | |
} | |
else { | |
disconnectedDuration.updateValue(now, forKey: ObjectIdentifier(connection).hashValue) | |
pendingRetry.append(connection) | |
} | |
return nil | |
} | |
active.append(connection) | |
return connection | |
} | |
return nil | |
} | |
private func nextConnection() throws -> PoolConnection? { | |
if idle.count > 0 { | |
return try nextIdleConnection() | |
} | |
else if suspect.count > 0 { | |
return try nextSuspectConnection() | |
} | |
else if pendingRetry.count > 0 { | |
return try nextPendingConnection() | |
} | |
return nil | |
} | |
private func doneWith(connection : PoolConnection) { | |
if let index = active.index(where: createPredicate(connection)) { | |
active.remove(at: index) | |
idle.append(connection) | |
} | |
} | |
private func logFailure(connection : PoolConnection) throws { | |
// if failed too many times remove from pool | |
if let index = errorDurations.index(forKey: ObjectIdentifier(connection).hashValue) { | |
let totalErrorDuration = errorDurations[index].value + retryDelay | |
errorDurations.updateValue(totalErrorDuration, forKey: ObjectIdentifier(connection).hashValue) | |
if connection.closed { | |
pendingRetry.append(connection) | |
disconnectedDuration.updateValue(now, forKey: ObjectIdentifier(connection).hashValue) | |
} | |
else if totalErrorDuration > maxErrorDuration { | |
suspect.append(connection) | |
throw PoolError.maxErrorDurationExeceeded | |
} | |
} | |
doneWith(connection) | |
} | |
private func logSuccess(connection : PoolConnection) { | |
if let _ = errorDurations.index(forKey: ObjectIdentifier(connection).hashValue) { | |
errorDurations.updateValue(0.millisecond, forKey: ObjectIdentifier(connection).hashValue) | |
} | |
} | |
public func with(handler: (poolable: PoolConnection) throws -> Any?) throws { | |
var hasExecuted = false | |
var nappedTime : Duration = 0.millisecond | |
while !hasExecuted { | |
do { | |
guard let connection = try self.nextConnection() else { | |
// We waited longer than permitted for a connections. | |
// Throw a timeout for the user to handle. | |
if self.connectionWait < nappedTime { | |
throw PoolError.timeout | |
} | |
nap(retryDelay) | |
nappedTime += retryDelay | |
continue | |
} | |
guard !connection.closed else { | |
try self.logFailure(connection) | |
continue | |
} | |
do { | |
try handler(poolable: connection) | |
self.logSuccess(connection) | |
self.doneWith(connection) | |
} | |
catch { | |
try self.logFailure(connection) | |
} | |
hasExecuted = true | |
} | |
catch {} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment