Last active
June 14, 2021 15:39
-
-
Save Jonathan-Mckenzie/62206f1da7896bc60c2857f1a21436e1 to your computer and use it in GitHub Desktop.
Vapor Request+AsyncRetry recursively retry async work until succeeded or timeout
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Foundation | |
import Vapor | |
/// Request+AsyncRetry | |
/// Asyncrhonously retries a piece of work with an eventual successful outcome | |
/// | |
/// - Parameters: | |
/// - retryCount: the total number of retries before giving up | |
/// - usDelayTimeout: number of microseconds to delay between each failed attempt | |
/// - job: async function to perform the work | |
/// - Returns: future successful or failed result | |
extension Request { | |
public func asyncRetry<T>(job: @escaping () throws -> EventLoopFuture<T>) throws -> EventLoopFuture<T> { | |
try self.asyncRetry( | |
maxRetryCount: 50, | |
usDelayTimeout: 100 * 1000, // sleep for 100ms | |
job: job | |
) | |
} | |
public func asyncRetry<T>(maxRetryCount: Int, usDelayTimeout: useconds_t, job: @escaping () throws -> EventLoopFuture<T>) throws -> EventLoopFuture<T> { | |
var retryCount = 0 | |
let promise = self.eventLoop.makePromise(of: T.self) | |
func shouldTerminate() -> Bool { | |
retryCount += 1 | |
guard retryCount < maxRetryCount else { | |
self.logger.error("[Request+asyncRetry] failed to complete work after retrying.") | |
promise.fail(Abort(.internalServerError)) | |
return true | |
} | |
return false | |
} | |
func performJob() { | |
let f: EventLoopFuture<Void> = self.application.threadPool.runIfActive(eventLoop: self.eventLoop) { | |
do { | |
let future: EventLoopFuture<T> = try job() | |
future.whenSuccess { response in | |
promise.succeed(response) | |
} | |
future.whenFailure { error in | |
guard !shouldTerminate() else { | |
return | |
} | |
self.logger.info("[Request+asyncRetry] work failed, retrying...") | |
usleep(usDelayTimeout) | |
performJob() | |
} | |
} catch { | |
self.logger.info("[Request+asyncRetry] job threw an exception") | |
guard !shouldTerminate() else { | |
return | |
} | |
usleep(usDelayTimeout) | |
performJob() | |
} | |
} | |
f.whenFailure { error in | |
self.logger.info("[Request+asyncRetry] failed to run on threadpool") | |
guard !shouldTerminate() else { | |
return | |
} | |
usleep(usDelayTimeout) | |
performJob() | |
} | |
} | |
performJob() | |
return promise.futureResult | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment