Skip to content

Instantly share code, notes, and snippets.

@Jonathan-Mckenzie
Last active June 14, 2021 15:39
Show Gist options
  • Save Jonathan-Mckenzie/62206f1da7896bc60c2857f1a21436e1 to your computer and use it in GitHub Desktop.
Save Jonathan-Mckenzie/62206f1da7896bc60c2857f1a21436e1 to your computer and use it in GitHub Desktop.
Vapor Request+AsyncRetry recursively retry async work until succeeded or timeout
import Foundation
import Vapor
/// Request+AsyncRetry
/// Asyncrhonously retries a piece of work with an eventual successful outcome
///
/// - Parameters:
/// - retryCount: the total number of retries before giving up
/// - usDelayTimeout: number of microseconds to delay between each failed attempt
/// - job: async function to perform the work
/// - Returns: future successful or failed result
extension Request {
public func asyncRetry<T>(job: @escaping () throws -> EventLoopFuture<T>) throws -> EventLoopFuture<T> {
try self.asyncRetry(
maxRetryCount: 50,
usDelayTimeout: 100 * 1000, // sleep for 100ms
job: job
)
}
public func asyncRetry<T>(maxRetryCount: Int, usDelayTimeout: useconds_t, job: @escaping () throws -> EventLoopFuture<T>) throws -> EventLoopFuture<T> {
var retryCount = 0
let promise = self.eventLoop.makePromise(of: T.self)
func shouldTerminate() -> Bool {
retryCount += 1
guard retryCount < maxRetryCount else {
self.logger.error("[Request+asyncRetry] failed to complete work after retrying.")
promise.fail(Abort(.internalServerError))
return true
}
return false
}
func performJob() {
let f: EventLoopFuture<Void> = self.application.threadPool.runIfActive(eventLoop: self.eventLoop) {
do {
let future: EventLoopFuture<T> = try job()
future.whenSuccess { response in
promise.succeed(response)
}
future.whenFailure { error in
guard !shouldTerminate() else {
return
}
self.logger.info("[Request+asyncRetry] work failed, retrying...")
usleep(usDelayTimeout)
performJob()
}
} catch {
self.logger.info("[Request+asyncRetry] job threw an exception")
guard !shouldTerminate() else {
return
}
usleep(usDelayTimeout)
performJob()
}
}
f.whenFailure { error in
self.logger.info("[Request+asyncRetry] failed to run on threadpool")
guard !shouldTerminate() else {
return
}
usleep(usDelayTimeout)
performJob()
}
}
performJob()
return promise.futureResult
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment