Created
August 4, 2017 04:23
-
-
Save MingweiSamuel/0ab8c6246e52549de72413ded2b1db9d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.mingweisamuel.zyra.util; | |
import java.util.Arrays; | |
import java.util.function.Supplier; | |
/** | |
* <p>This class represents a token bucket system. One instance represents one recurring bucket with a certain | |
* limit of tokens per timespan.</p> | |
* | |
* <p>A circular buffer keeps track of tokens. The index of each bucket represents the number of requests sent | |
* during that time period and as time passes, old indices are zeroed and the current index advances. The entire | |
* length of the buffer minus one represents a entire timespan (each index represents a fraction of the total | |
* timespan). The extra index prevents violations due to bucket misalignment. A rolling total is kept of the | |
* buffer's contents. | |
* | |
* <p>When trying to obtain a token, we first check the rolling total is less than the limit. If so, we obtain | |
* a token by incrementing the rolling total and incrementing the buffer's current index.</p> | |
* | |
* <p>The length of the buffer is one more than the temporal factor supplied to the constructor. The temporal | |
* factor represents the increase in temporal resolution provided with more buffer indices.</p> | |
* | |
* <p>Additionally, a non-zero spreading factor can be provided to prevent a single index from supplying all of | |
* a timespan's tokens. A spreading factor of 0.0 means no spreading, a factor of 0.5 means each index can | |
* supply up to half of the tokens, and a factor of 1.0 means tokens will be evenly supplied by all indices | |
* (provided there is enough demand).</p> | |
* | |
* <p>Checking the availability of tokens is done using the {@link #getDelay()} method. Tokens are obtained | |
* using the {@link #getToken()} method. Because the state of the bucket may change if there are multiple | |
* threads, it is best to call these methods in a synchronized block.</p> | |
* {@code | |
* TemporalTokenBucket bucket = ...; | |
* while (true) { | |
* long delay; | |
* synchronized (bucket) { | |
* delay = bucket.getDelay(); | |
* if (delay == -1) { | |
* bucket.getToken(); | |
* break; | |
* } | |
* } | |
* // Waiting is done outside of the synchronized block. | |
* Thread.sleep(delay); | |
* } | |
* // Token is obtained. | |
* ... | |
* } | |
*/ | |
public class TemporalTokenBucket { | |
/** A time supplier. A simulated supplier can be used for debugging purposes. */ | |
private final Supplier<Long> timeSupplier; | |
/** The maximum number of tokens that can be supplied per timespan. */ | |
private final int totalLimit; | |
/** The maximum number of tokens a single index can supply per timespan. */ | |
private final int indexLimit; | |
/** The timespan represented by a single index. */ | |
private final long indexTimespan; | |
/** Circular buffer storing the number of tokens supplied by each index, corresponding to a section of the | |
* total time span. */ | |
private final int[] buffer; | |
/** The rolling total of tokens supplied. */ | |
private volatile int total = 0; | |
/** The millisecond timestamp corresponding to the most recent update of the buffer. */ | |
private volatile long time = -1; | |
/** | |
* Creates a instance using System::currentTimeMillis as the time supplier. | |
* @param timespan The time per bucket in milliseconds. | |
* @param totalLimit The maximum number of tokens provided per timespan. | |
* @param temporalFactor Temporal multiplier corresponding to token time tracking. | |
* @param spreadFactor Factor corresponding to token supply spread (from multiple indices). | |
*/ | |
public TemporalTokenBucket(long timespan, int totalLimit, int temporalFactor, float spreadFactor) { | |
this(timespan, totalLimit, temporalFactor, spreadFactor, System::currentTimeMillis); | |
} | |
/** | |
* Creates a instance with a custom time supplier. | |
* @param timespan The time per bucket in milliseconds. | |
* @param totalLimit The maximum number of tokens provided per timespan. | |
* @param temporalFactor Temporal multiplier corresponding to token time tracking. | |
* @param spreadFactor Factor corresponding to token supply spread (from multiple indices). | |
* @param timeSupplier Supplies non-descending millisecond time, useful for debugging. | |
*/ | |
public TemporalTokenBucket(long timespan, int totalLimit, int temporalFactor, float spreadFactor, | |
Supplier<Long> timeSupplier) { | |
this.timeSupplier = timeSupplier; | |
this.totalLimit = totalLimit; | |
this.indexLimit = (int) (totalLimit / spreadFactor / temporalFactor); | |
this.indexTimespan = (long) Math.ceil(timespan / (double) temporalFactor); | |
this.buffer = new int[temporalFactor + 1]; | |
} | |
/** | |
* Get the approximate delay til the next available token, or -1 if a token is available. | |
* @return Delay in milliseconds or -1. | |
*/ | |
public synchronized long getDelay() { | |
int index = update(); | |
if (total < totalLimit) { | |
if (buffer[index] >= indexLimit) | |
return getTimeToBucket(1); | |
return -1; | |
} | |
// check how soon into the future old buckets will be zeroed, making requests available. | |
int i = 1; | |
for (; i < buffer.length; i++) { | |
if (buffer[(index + i) % buffer.length] > 0) | |
break; | |
} | |
return getTimeToBucket(i); | |
} | |
/** | |
* Gets a token, regardless of whether one is available. | |
* @return True if the token was obtained without violating limits, false otherwise. | |
*/ | |
public synchronized boolean getToken() { | |
int index = update(); | |
buffer[index]++; | |
total++; | |
return total <= totalLimit && buffer[index] <= indexLimit; | |
} | |
/** | |
* Updates the circular buffer and {@link #time} to match the passage of time. | |
* @return The current index. | |
*/ | |
private synchronized int update() { | |
// The first time this is called, we initialize the time. | |
if (time < 0) { | |
time = timeSupplier.get(); | |
return getIndex(time); | |
} | |
int index = getIndex(time); | |
int length = getLength(time, (time = timeSupplier.get())); | |
if (length < 0) | |
throw new IllegalStateException("Length should be non-negative: " + length); | |
if (length == 0) | |
return index; | |
if (length >= buffer.length) { | |
Arrays.fill(buffer, 0); | |
total = 0; | |
return index; | |
} | |
for (int i = 0; i < length; i++) { | |
index++; | |
index %= buffer.length; | |
total -= buffer[index]; | |
buffer[index] = 0; | |
} | |
if (getIndex(time) != index) | |
throw new IllegalStateException("Get index time: " + getIndex(time) + ", index: " + index); | |
return index; | |
} | |
/** | |
* @param n Number of buckets in the future to look (n=1 -> next bucket). | |
* @return Time until the next nth bucket in milliseconds. | |
*/ | |
private long getTimeToBucket(int n) { | |
return n * indexTimespan - (time % indexTimespan); | |
} | |
/** | |
* Gets the circular buffer index corresponding to a particular timestamp. | |
* @param timestamp Millisecond timestamp. | |
* @return Buffer index. | |
*/ | |
private int getIndex(long timestamp) { | |
return (int) ((timestamp / indexTimespan) % buffer.length); | |
} | |
/** | |
* Gets the index distance between two timestamps. Because the buffer is circular, the distance between | |
* indices may be greater than the length of the buffer. | |
* @param startTimestamp Start millisecond timestamp. | |
* @param endTimestamp End millisecond timestamp. | |
* @return The index distance. May be greater than the length of the buffer. | |
*/ | |
private int getLength(long startTimestamp, long endTimestamp) { | |
return (int) ((endTimestamp / indexTimespan - startTimestamp / indexTimespan)); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment