Skip to content

Instantly share code, notes, and snippets.

@romanbsd
Created September 21, 2023 08:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save romanbsd/b92b12fd7c5d75650b56bc1094ad7f6b to your computer and use it in GitHub Desktop.
Save romanbsd/b92b12fd7c5d75650b56bc1094ad7f6b to your computer and use it in GitHub Desktop.
asynchronously reserve permission
class MyVerticle extends AbstractVerticle {
/* Try to acquire permits asynchronously */
private <T> Supplier<CompletionStage<T>> decorateCompletionStage(
RateLimiter rateLimiter,
Supplier<CompletionStage<T>> supplier) {
return () -> {
final CompletableFuture<T> promise = new CompletableFuture<>();
final BiConsumer<T, Throwable> biConsumer = (result, throwable) -> {
if (throwable != null) {
rateLimiter.onError(throwable);
promise.completeExceptionally(throwable);
} else {
rateLimiter.onResult(result);
promise.complete(result);
}
};
try {
long waitNano = rateLimiter.reservePermission();
if (waitNano == 0) {
supplier.get().whenComplete(biConsumer);
} else if (waitNano > 0) {
long waitMs = waitNano / 1000000;
LOG.warn("Waiting {}ms", waitMs);
vertx.setTimer(waitMs, u -> supplier.get().whenComplete(biConsumer));
} else {
promise.completeExceptionally(RequestNotPermitted.createRequestNotPermitted(rateLimiter));
}
} catch (Exception exception) {
rateLimiter.onError(exception);
promise.completeExceptionally(exception);
}
return promise;
};
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment