Last active
September 26, 2022 12:14
-
-
Save MrMage/6fe071f405ac2c1b8a4cde25f05061db to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Foundation | |
import Logging | |
import NIOConcurrencyHelpers | |
import NIO | |
import Async | |
import Service | |
public protocol CloseableResource: AnyObject { | |
var eventLoop: EventLoop { get } | |
var isClosed: Bool { get } | |
func close() | |
} | |
public struct GlobalResourcePoolConfig { | |
/// The number of resources that the pool should not fall below. | |
public let minResources: Int | |
/// The maximum number of resources allocated by the pool. | |
public let maxResources: Int | |
public let resourceRequestTimeout: TimeInterval | |
public let pruneInterval: TimeInterval | |
public let idleTimeForPruning: TimeInterval | |
public init(minResources: Int = 2, maxResources: Int = 10, | |
resourceRequestTimeout: TimeInterval = 5, | |
pruneInterval: TimeInterval = 60, idleTimeForPruning: TimeInterval = 120) { | |
self.minResources = minResources | |
self.maxResources = maxResources | |
self.resourceRequestTimeout = resourceRequestTimeout | |
self.pruneInterval = pruneInterval | |
self.idleTimeForPruning = idleTimeForPruning | |
} | |
} | |
public final class GlobalResourcePool<Resource>: Service where Resource: CloseableResource { | |
public enum Error: Swift.Error { | |
case timedOut | |
} | |
public typealias ResourceFactory = (EventLoop) -> Future<Resource> | |
private let resourceFactory: ResourceFactory | |
public let config: GlobalResourcePoolConfig | |
private let eventLoopGroup: EventLoopGroup | |
private let logger: Logger? | |
private var active: [PooledResource<Resource>] | |
private var available: [PooledResource<Resource>] | |
private let semaphore: DispatchSemaphore | |
private let lock: Lock | |
private let waitingQueue: DispatchQueue | |
public init(config: GlobalResourcePoolConfig, eventLoopGroup: EventLoopGroup, logger: Logger?, | |
resourceFactory: @escaping ResourceFactory) { | |
self.config = config | |
self.eventLoopGroup = eventLoopGroup | |
self.logger = logger | |
self.resourceFactory = resourceFactory | |
self.active = [] | |
self.available = [] | |
self.semaphore = DispatchSemaphore(value: config.maxResources) | |
self.lock = Lock() | |
self.waitingQueue = DispatchQueue(label: String(describing: type(of: self)) + ".waitingQueue") | |
self.pruneResources() | |
} | |
public func withResource<T>( | |
file: String = #file, function: String = #function, line: UInt = #line, column: UInt = #column, | |
_ closure: @escaping (Resource) throws -> Future<T>) -> Future<T> { | |
return requestResource().flatMap(to: T.self) { resource in | |
let resultFuture: Future<T> | |
do { | |
resultFuture = try closure(resource) | |
} catch { | |
resultFuture = resource.eventLoop.newFailedFuture(error: error) | |
} | |
return resultFuture | |
.catchMap { throw $0.annotated(SourceLocation(file: file, function: function, line: line, column: column, range: nil)) } | |
.always { self.releaseResource(resource) } | |
} | |
} | |
public func requestResource() -> Future<Resource> { | |
let promise = self.eventLoopGroup.next().newPromise(Resource.self) | |
waitingQueue.async { | |
guard self.semaphore.wait( | |
timeout: .now() + .milliseconds(Int(1000 * self.config.resourceRequestTimeout))) == .success else { | |
promise.fail(error: Error.timedOut) | |
return | |
} | |
if let availableResource = (self.lock.withLock { self.available.popLast() }) { | |
let currentResource = self.lock.withLock { availableResource.resource! } | |
if !currentResource.isClosed { | |
promise.succeed(result: currentResource) | |
} else { | |
self.logger?.info("[\(self)] Resource closed; re-creating") | |
self.resourceFactory(currentResource.eventLoop).map { createdResource in | |
self.lock.withLockVoid { availableResource.resource = createdResource } | |
return createdResource | |
}.cascade(promise: promise) | |
} | |
} else { | |
let newPooledResource = PooledResource<Resource>() | |
var numberOfActiveResources = 0 | |
self.lock.withLockVoid { | |
self.active.append(newPooledResource) | |
numberOfActiveResources = self.active.count | |
} | |
self.logger?.info("[\(self)] Allocated new resource (\(numberOfActiveResources)/\(self.config.maxResources))") | |
let eventLoop = self.eventLoopGroup.next() | |
self.resourceFactory(eventLoop).map { createdResource in | |
self.lock.withLockVoid { newPooledResource.resource = createdResource } | |
return createdResource | |
}.cascade(promise: promise) | |
} | |
} | |
return promise.futureResult | |
} | |
public func releaseResource(_ resource: Resource) { | |
guard let pooledResource = (lock.withLock { active.first { $0.resource === resource } }) else { | |
assertionFailure("Attempted to release a connection to a pool that did not create it.") | |
return | |
} | |
let now = Date() | |
lock.withLockVoid { | |
pooledResource.lastUsed = now | |
available.append(pooledResource) | |
semaphore.signal() | |
} | |
} | |
private func pruneResources() { | |
lock.withLockVoid { | |
let now = Date() | |
let oldActiveCount = active.count | |
var toPrune = Set<ObjectIdentifier>() | |
for pooledResource in available { | |
if pooledResource.resource.isClosed | |
|| (now.timeIntervalSince(pooledResource.lastUsed) >= config.idleTimeForPruning | |
&& (active.count - toPrune.count) > config.minResources) { | |
pooledResource.resource.close() | |
toPrune.insert(ObjectIdentifier(pooledResource)) | |
} | |
} | |
available = available.filter { !toPrune.contains(ObjectIdentifier($0)) } | |
active = active.filter { !toPrune.contains(ObjectIdentifier($0)) } | |
logger?.info("[\(self)] Prune: \(oldActiveCount) active resources before, \(active.count) now.") | |
} | |
_ = eventLoopGroup.next().scheduleTask(in: .milliseconds(Int(1000 * config.pruneInterval))) { [weak self] in | |
self?.pruneResources() | |
} | |
} | |
/// Closes all connections that currently not in use, or all connections altogether. | |
public func drain(includeUnavailable: Bool) { | |
lock.withLockVoid { | |
var toPrune = Set<ObjectIdentifier>() | |
for pooledResource in (includeUnavailable ? active : available) { | |
pooledResource.resource.close() | |
toPrune.insert(ObjectIdentifier(pooledResource)) | |
} | |
available = available.filter { !toPrune.contains(ObjectIdentifier($0)) } | |
active = active.filter { !toPrune.contains(ObjectIdentifier($0)) } | |
logger?.info("[\(self)] Drain: Purged \(toPrune.count) resources.") | |
} | |
} | |
} | |
private final class PooledResource<Resource> where Resource: CloseableResource { | |
var resource: Resource! | |
var lastUsed = Date() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment