Skip to content

Instantly share code, notes, and snippets.

Last active April 3, 2016 01:51
Show Gist options
  • Save scottbyrns/aa5c4657e56683272415e039a87e0488 to your computer and use it in GitHub Desktop.
Save scottbyrns/aa5c4657e56683272415e039a87e0488 to your computer and use it in GitHub Desktop.
Swift Connection Pool
import C7
import Venice
import TCP
public enum PoolError : ErrorProtocol {
case tooBusy, empty, timeout, maxErrorDurationExeceeded, maxWaitDurationExceeded, maxUnavailableDurationExceeded
protocol Pool {
associatedtype Poolable
func borrow () -> Poolable?
func takeBack (poolable: Poolable)
func with (handler: (poolable: Poolable) throws -> Any?) throws
public class ConnectionPool<PoolConnection : Connection where PoolConnection : AnyObject> : Pool {
var maxErrorDuration : Duration
var retryDelay : Duration
var connectionWait : Duration
var maxReconnectDuration : Duration
private var active = [PoolConnection]()
private var lent = [PoolConnection]()
private var idle = [PoolConnection]()
private var suspect = [PoolConnection]()
private var pendingRetry = [PoolConnection]()
private var errorDurations = [Int: Duration]()
private var connectionErrors = [Int: Array<PoolError>]()
private var disconnectedDuration = [Int: Int64]()
public init (using pool: [PoolConnection], toleratingErrorsFor maxErrorDuration: Duration, delayingRetryFor retryDelay: Duration, waitingForConnectionNoMoreThan connectionWait: Duration, discardingIfUnavailableForMoreThan maxReconnectDuration: Duration) {
idle = pool
self.maxErrorDuration = maxErrorDuration
self.retryDelay = retryDelay
self.connectionWait = connectionWait
self.maxReconnectDuration = maxReconnectDuration
for connection in pool {
errorDurations[ObjectIdentifier(connection).hashValue] = 0
disconnectedDuration[ObjectIdentifier(connection).hashValue] = 0.millisecond
connectionErrors[ObjectIdentifier(connection).hashValue] = Array<PoolError>()
private func createPredicate (connection: PoolConnection) -> (PoolConnection) -> Bool {
return { $0 === connection }
public func remove(connection : PoolConnection) -> Bool {
let closed = connection.close()
if closed {
let predicate = createPredicate(connection)
if let index = active.index(where: predicate) {
active.remove(at: index)
else if let index = idle.index(where: predicate) {
idle.remove(at: index)
else if let index = lent.index(where: predicate) {
lent.remove(at: index)
errorDurations.removeValue(forKey: ObjectIdentifier(connection).hashValue)
disconnectedDuration.removeValue(forKey: ObjectIdentifier(connection).hashValue)
connectionErrors.removeValue(forKey: ObjectIdentifier(connection).hashValue)
return closed
public func borrow() -> PoolConnection? {
guard idle.count > 0 else {
return nil
guard let connection = idle.first else {
return nil
idle.remove(at: 0)
return connection
public func takeBack (connection: PoolConnection) {
if let index = lent.index(where: createPredicate(connection)) {
lent.remove(at: index)
private func nextIdleConnection() throws -> PoolConnection? {
guard let connection = idle.first else {
return nil
idle.remove(at: 0)
return connection
private func nextSuspectConnection() throws -> PoolConnection? {
guard let connection = suspect.first else {
return nil
suspect.remove(at: 0)
return connection
private func nextPendingConnection() throws -> PoolConnection? {
if let connection = pendingRetry.first {
pendingRetry.remove(at: 0)
do {
catch {
if let index = disconnectedDuration.index(forKey: ObjectIdentifier(connection).hashValue) {
if now - disconnectedDuration[index].value > maxReconnectDuration {
throw PoolError.maxUnavailableDurationExceeded
else {
disconnectedDuration.updateValue(now, forKey: ObjectIdentifier(connection).hashValue)
return nil
return connection
return nil
private func nextConnection() throws -> PoolConnection? {
if idle.count > 0 {
return try nextIdleConnection()
else if suspect.count > 0 {
return try nextSuspectConnection()
else if pendingRetry.count > 0 {
return try nextPendingConnection()
return nil
private func doneWith(connection : PoolConnection) {
if let index = active.index(where: createPredicate(connection)) {
active.remove(at: index)
private func logFailure(connection : PoolConnection) throws {
// if failed too many times remove from pool
if let index = errorDurations.index(forKey: ObjectIdentifier(connection).hashValue) {
let totalErrorDuration = errorDurations[index].value + retryDelay
errorDurations.updateValue(totalErrorDuration, forKey: ObjectIdentifier(connection).hashValue)
if connection.closed {
disconnectedDuration.updateValue(now, forKey: ObjectIdentifier(connection).hashValue)
else if totalErrorDuration > maxErrorDuration {
throw PoolError.maxErrorDurationExeceeded
private func logSuccess(connection : PoolConnection) {
if let _ = errorDurations.index(forKey: ObjectIdentifier(connection).hashValue) {
errorDurations.updateValue(0.millisecond, forKey: ObjectIdentifier(connection).hashValue)
public func with(handler: (poolable: PoolConnection) throws -> Any?) throws {
var hasExecuted = false
var nappedTime : Duration = 0.millisecond
while !hasExecuted {
do {
guard let connection = try self.nextConnection() else {
// We waited longer than permitted for a connections.
// Throw a timeout for the user to handle.
if self.connectionWait < nappedTime {
throw PoolError.timeout
nappedTime += retryDelay
guard !connection.closed else {
try self.logFailure(connection)
do {
try handler(poolable: connection)
catch {
try self.logFailure(connection)
hasExecuted = true
catch {}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment