Skip to content

Instantly share code, notes, and snippets.

@Alex1304
Last active May 25, 2019 13:49
Show Gist options
  • Save Alex1304/eafdbcc94f18b692a28ef36ec1b096db to your computer and use it in GitHub Desktop.
Save Alex1304/eafdbcc94f18b692a28ef36ec1b096db to your computer and use it in GitHub Desktop.
package com.github.alex1304.ultimategdbot.core;
import java.time.Duration;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import discord4j.rest.request.GlobalRateLimiter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* <p>Custom implementation of {@link GlobalRateLimiter} that accepts a throughput
* parameter, and adjusts the delay of requests in a such way that the effective
* throughput complies with the targeted one.</p>
*
* <p>For example, if the parameter is set to 25, this limiter will allow a maximum
* throughput of 25 requests per second.</p>
*
* <p>The effective throughput may be lower than the specified one if Discord's
* global rate limit is being reached.</p>
*
* @author Alex1304
*/
public class FixedThroughputGlobalRateLimiter implements GlobalRateLimiter {
private static final Logger LOGGER = LoggerFactory.getLogger(FixedThroughputGlobalRateLimiter.class);
private final long delayStepNanos;
private volatile long globallyRateLimitedUntil = 0;
private volatile long throughputLimitedUntil = 0;
/**
* Creates a {@link FixedThroughputGlobalRateLimiter} with a specific throughput value.
*
* @param throughput the target thoughput value, in requests per second
*/
public FixedThroughputGlobalRateLimiter(int throughput) {
if (throughput < 1) {
throw new IllegalArgumentException("throughput must be >= 1");
}
this.delayStepNanos = 1_000_000_000 / throughput;
}
/**
* Notifies when the request is ready to be sent.
*
* @return a Mono that completes when this limiter is ready to allow for more requests
*/
private Mono<Void> notifier() {
var now = System.nanoTime();
throughputLimitedUntil = Math.max(throughputLimitedUntil + delayStepNanos, now);
return Mono.delay(Duration.ofNanos(Math.max(globallyRateLimitedUntil, throughputLimitedUntil) - now))
.then(Mono.fromRunnable(() -> LOGGER.debug("Permit!")));
}
/**
* {@inheritDoc}
*/
@Override
public void rateLimitFor(Duration duration) {
globallyRateLimitedUntil = System.nanoTime() + duration.toNanos();
}
/**
* {@inheritDoc}
*/
@Override
public Duration getRemaining() {
var remaining = globallyRateLimitedUntil - System.nanoTime();
if (remaining > 0) {
LOGGER.debug("On hold!");
}
return Duration.ofNanos(remaining);
}
/**
* {@inheritDoc}
*/
@Override
public <T> Flux<T> withLimiter(Publisher<T> stage) {
return Flux.defer(() -> notifier().thenMany(stage));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment