Skip to content

Instantly share code, notes, and snippets.

@MrMage
Last active September 26, 2022 12:14
Show Gist options
  • Save MrMage/6fe071f405ac2c1b8a4cde25f05061db to your computer and use it in GitHub Desktop.
Save MrMage/6fe071f405ac2c1b8a4cde25f05061db to your computer and use it in GitHub Desktop.
import Foundation
import Logging
import NIOConcurrencyHelpers
import NIO
import Async
import Service
public protocol CloseableResource: AnyObject {
var eventLoop: EventLoop { get }
var isClosed: Bool { get }
func close()
}
public struct GlobalResourcePoolConfig {
/// The number of resources that the pool should not fall below.
public let minResources: Int
/// The maximum number of resources allocated by the pool.
public let maxResources: Int
public let resourceRequestTimeout: TimeInterval
public let pruneInterval: TimeInterval
public let idleTimeForPruning: TimeInterval
public init(minResources: Int = 2, maxResources: Int = 10,
resourceRequestTimeout: TimeInterval = 5,
pruneInterval: TimeInterval = 60, idleTimeForPruning: TimeInterval = 120) {
self.minResources = minResources
self.maxResources = maxResources
self.resourceRequestTimeout = resourceRequestTimeout
self.pruneInterval = pruneInterval
self.idleTimeForPruning = idleTimeForPruning
}
}
public final class GlobalResourcePool<Resource>: Service where Resource: CloseableResource {
public enum Error: Swift.Error {
case timedOut
}
public typealias ResourceFactory = (EventLoop) -> Future<Resource>
private let resourceFactory: ResourceFactory
public let config: GlobalResourcePoolConfig
private let eventLoopGroup: EventLoopGroup
private let logger: Logger?
private var active: [PooledResource<Resource>]
private var available: [PooledResource<Resource>]
private let semaphore: DispatchSemaphore
private let lock: Lock
private let waitingQueue: DispatchQueue
public init(config: GlobalResourcePoolConfig, eventLoopGroup: EventLoopGroup, logger: Logger?,
resourceFactory: @escaping ResourceFactory) {
self.config = config
self.eventLoopGroup = eventLoopGroup
self.logger = logger
self.resourceFactory = resourceFactory
self.active = []
self.available = []
self.semaphore = DispatchSemaphore(value: config.maxResources)
self.lock = Lock()
self.waitingQueue = DispatchQueue(label: String(describing: type(of: self)) + ".waitingQueue")
self.pruneResources()
}
public func withResource<T>(
file: String = #file, function: String = #function, line: UInt = #line, column: UInt = #column,
_ closure: @escaping (Resource) throws -> Future<T>) -> Future<T> {
return requestResource().flatMap(to: T.self) { resource in
let resultFuture: Future<T>
do {
resultFuture = try closure(resource)
} catch {
resultFuture = resource.eventLoop.newFailedFuture(error: error)
}
return resultFuture
.catchMap { throw $0.annotated(SourceLocation(file: file, function: function, line: line, column: column, range: nil)) }
.always { self.releaseResource(resource) }
}
}
public func requestResource() -> Future<Resource> {
let promise = self.eventLoopGroup.next().newPromise(Resource.self)
waitingQueue.async {
guard self.semaphore.wait(
timeout: .now() + .milliseconds(Int(1000 * self.config.resourceRequestTimeout))) == .success else {
promise.fail(error: Error.timedOut)
return
}
if let availableResource = (self.lock.withLock { self.available.popLast() }) {
let currentResource = self.lock.withLock { availableResource.resource! }
if !currentResource.isClosed {
promise.succeed(result: currentResource)
} else {
self.logger?.info("[\(self)] Resource closed; re-creating")
self.resourceFactory(currentResource.eventLoop).map { createdResource in
self.lock.withLockVoid { availableResource.resource = createdResource }
return createdResource
}.cascade(promise: promise)
}
} else {
let newPooledResource = PooledResource<Resource>()
var numberOfActiveResources = 0
self.lock.withLockVoid {
self.active.append(newPooledResource)
numberOfActiveResources = self.active.count
}
self.logger?.info("[\(self)] Allocated new resource (\(numberOfActiveResources)/\(self.config.maxResources))")
let eventLoop = self.eventLoopGroup.next()
self.resourceFactory(eventLoop).map { createdResource in
self.lock.withLockVoid { newPooledResource.resource = createdResource }
return createdResource
}.cascade(promise: promise)
}
}
return promise.futureResult
}
public func releaseResource(_ resource: Resource) {
guard let pooledResource = (lock.withLock { active.first { $0.resource === resource } }) else {
assertionFailure("Attempted to release a connection to a pool that did not create it.")
return
}
let now = Date()
lock.withLockVoid {
pooledResource.lastUsed = now
available.append(pooledResource)
semaphore.signal()
}
}
private func pruneResources() {
lock.withLockVoid {
let now = Date()
let oldActiveCount = active.count
var toPrune = Set<ObjectIdentifier>()
for pooledResource in available {
if pooledResource.resource.isClosed
|| (now.timeIntervalSince(pooledResource.lastUsed) >= config.idleTimeForPruning
&& (active.count - toPrune.count) > config.minResources) {
pooledResource.resource.close()
toPrune.insert(ObjectIdentifier(pooledResource))
}
}
available = available.filter { !toPrune.contains(ObjectIdentifier($0)) }
active = active.filter { !toPrune.contains(ObjectIdentifier($0)) }
logger?.info("[\(self)] Prune: \(oldActiveCount) active resources before, \(active.count) now.")
}
_ = eventLoopGroup.next().scheduleTask(in: .milliseconds(Int(1000 * config.pruneInterval))) { [weak self] in
self?.pruneResources()
}
}
/// Closes all connections that currently not in use, or all connections altogether.
public func drain(includeUnavailable: Bool) {
lock.withLockVoid {
var toPrune = Set<ObjectIdentifier>()
for pooledResource in (includeUnavailable ? active : available) {
pooledResource.resource.close()
toPrune.insert(ObjectIdentifier(pooledResource))
}
available = available.filter { !toPrune.contains(ObjectIdentifier($0)) }
active = active.filter { !toPrune.contains(ObjectIdentifier($0)) }
logger?.info("[\(self)] Drain: Purged \(toPrune.count) resources.")
}
}
}
private final class PooledResource<Resource> where Resource: CloseableResource {
var resource: Resource!
var lastUsed = Date()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment