Skip to content

Instantly share code, notes, and snippets.

@mikaelbartlett
Last active May 18, 2020 22:42
Show Gist options
  • Save mikaelbartlett/033602f74bbe67545a982c9a9ec70162 to your computer and use it in GitHub Desktop.
Save mikaelbartlett/033602f74bbe67545a982c9a9ec70162 to your computer and use it in GitHub Desktop.
import Foundation
import Vapor
final class TokenBucket {
public let capacity: Int
public var tokenCount: Int
let workQueue = DispatchQueue(label: "com.midjar.resourcetokenbucket")
public init(capacity: Int, initialTokenCount: Int = 0) {
self.capacity = capacity
self.tokenCount = min(capacity, initialTokenCount)
}
public func consume(_ count: Int, on eventLoop: EventLoop) -> EventLoopFuture<Void> {
guard count <= capacity else {
fatalError("Cannot consume \(count) amount of tokens on a bucket with capacity \(capacity)")
}
return tryConsume(count, until: Date.distantFuture, on: eventLoop).map { _ in Void() }
}
public func tryConsume(_ count: Int, until limitDate: Date, on eventLoop: EventLoop) -> EventLoopFuture<Bool> {
guard count <= capacity else {
fatalError("Cannot consume \(count) amount of tokens on a bucket with capacity \(capacity)")
}
let promise = eventLoop.makePromise(of: Bool.self)
workQueue.async {
let result = self.wait(until: limitDate, for: count)
promise.succeed(result)
}
return promise.futureResult
}
private let condition = NSCondition()
func replenish(_ count: Int) {
condition.lock()
tokenCount = min(tokenCount + count, capacity)
condition.unlock()
}
private func wait(until limitDate: Date, for tokens: Int) -> Bool {
condition.lock()
defer {
condition.unlock()
}
while tokenCount < tokens {
if limitDate < Date() {
return false
}
_ = condition.wait(until: Date().addingTimeInterval(0.2))
}
tokenCount -= tokens
return true
}
}
/// Example usage with bucket what constraints to 3 tokens.
/// Need to call replenish on the token bucket to "give back" the token so the next
/// in line can continue
let tokenBucket = ResourceTokenBucket(capacity: 3, initialTokenCount: 3)
func downloadFile(using context: QueueContext,
authentication: Authentication,
path: String,
filename: String) throws -> EventLoopFuture<Result>? {
let filePath = "\(path)\(filename)"
let request = try HTTPClient.Request(url: filePath)
let downloadFilePath = tempFile(filename: filename)
guard !FileManager.default.fileExists(atPath: downloadFilePath) else { return nil }
let delegate = FileWriterResponseDelegate(request: request, filePath: downloadFilePath)
return tokenBucket.consume(1, on: context.eventLoop).flatMap { _ -> EventLoopFuture<Result> in
let result = context.application.http.client.shared.execute(request: request, delegate: delegate, eventLoop: .indifferent)
return result.futureResult.map {
if case FileWriterResponseDelegate.State.error(let error) = delegate.state {
context.logger.error("\(error)")
}
self.tokenBucket.replenish(1)
return $0
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment