Skip to content

Instantly share code, notes, and snippets.

@romanbsd
Created October 12, 2023 09:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save romanbsd/1207ed00187afc49ab569e7698ea29ea to your computer and use it in GitHub Desktop.
Save romanbsd/1207ed00187afc49ab569e7698ea29ea to your computer and use it in GitHub Desktop.
class VertxRetryHelper {
private static final Logger LOG = LoggerFactory.getLogger(VertxRetryHelper.class);
private final Vertx vertx;
public VertxRetryHelper(Vertx vertx) {
this.vertx = vertx;
}
public <T> Supplier<CompletionStage<T>> decorateCompletionStage(
RateLimiter rateLimiter,
Supplier<CompletionStage<T>> supplier) {
return () -> {
final CompletableFuture<T> promise = new CompletableFuture<>();
final BiConsumer<T, Throwable> biConsumer = (result, throwable) -> {
if (throwable != null) {
rateLimiter.onError(throwable);
promise.completeExceptionally(throwable);
} else {
rateLimiter.onResult(result);
promise.complete(result);
}
};
try {
long waitMs = rateLimiter.reservePermission() / 1000000;
if (waitMs == 0) {
supplier.get().whenComplete(biConsumer);
} else if (waitMs > 0) {
LOG.warn("Waiting {}ms", waitMs);
vertx.setTimer(waitMs, u -> supplier.get().whenComplete(biConsumer));
} else {
promise.completeExceptionally(RequestNotPermitted.createRequestNotPermitted(rateLimiter));
}
} catch (Exception exception) {
rateLimiter.onError(exception);
promise.completeExceptionally(exception);
}
return promise;
};
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment