Skip to content

Instantly share code, notes, and snippets.

@kimathie
Last active December 16, 2020 21:01
Show Gist options
  • Save kimathie/5f1e409caadf0f96bf2423c27a34673a to your computer and use it in GitHub Desktop.
Save kimathie/5f1e409caadf0f96bf2423c27a34673a to your computer and use it in GitHub Desktop.
Hash timing wheel
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
/**
* A timing wheel is a data structure that that provides the ability to manage
* an event/action that should take place within a specified time window. The
* idea is to have a time facility that schedules and manage timeouts
* efficiently as described by George Varghese and Tony Lauck.
*
* This implementation is a hashed wheel timer with two critical parameters only
* to adjust latency based on needs.
*
* <h3> Interval</h3>
*
* This is the time period in which the timer will rotate on every tick. Each
* tick will perform 3 actions
* <p>
* 1. Check for newly scheduled timers and insert them to the wheel.</p>
* <p>
* 2. Check for any cancelled timers and remove them from the wheel.</p>
* <p>
* 3. Rotate through a select bucket and check for expired timers.</p>
*
* <p>
* These actions ensure that the timers are added to the timer at approximate
* time, checked for staleness that is if they have expired or been cancelled
* and finally clean up that limited data structure to allow room for more
* timers to be scheduled in future. Depending on the needs of the application a
* lower interval increases the of accuracy the timer. By default the value is
* 100ms.
* </p>
*
* <h3>Wheel Size</h3>
* This is the number of elements that the timing wheel can store. A timing
* wheel is a hash table under the hood which is a data structure that maps
* unique keys to values. In a timing wheel a key is represented by the expiry
* time of the timer and the value would be the timer it's self. Depending on
* the needs of the application a large wheel size would be set to handle a
* large number of timeouts. By default the value herein is 64.
*
* @author kimathie
*/
public class TimerWheel {
private long interval;
private Thread thread;
private TimeoutBucket[] wheel;
private ConcurrentLinkedQueue<Participant> scheduled;
private ConcurrentLinkedQueue<Participant> cancelled;
private static final int STARTED = 0;
private static final int STOPPED = 1;
private final AtomicInteger state = new AtomicInteger(STOPPED);
public TimerWheel() {
this(100, TimeUnit.MILLISECONDS, 64);
}
public TimerWheel(long interval, TimeUnit unit) {
this(interval, unit, 64);
}
public TimerWheel(int wheelSize) {
this(100, TimeUnit.MILLISECONDS, wheelSize);
}
public TimerWheel(long interval, TimeUnit unit, int wheelSize) {
this.interval = unit.toMillis(interval);
this.wheel = createWheel(wheelSize);
this.scheduled = new ConcurrentLinkedQueue();
this.cancelled = new ConcurrentLinkedQueue();
}
public void start() {
if (STOPPED == state.get()) {
thread = new Thread(new Worker());
thread.start();
state.compareAndSet(STOPPED, STARTED);
}
}
public void stop() {
if (STARTED == state.get()) {
thread.interrupt();
state.compareAndSet(STARTED, STOPPED);
}
}
public Timeout schedule(TimerTask task, long delay, TimeUnit unit) {
long deadline = System.nanoTime() + unit.toNanos(delay);
Participant p = new Participant(this, task, deadline);
scheduled.add(p);
return p;
}
class Worker implements Runnable {
@Override
public void run() {
int index = 0;
while (true) {
try {
if (STOPPED == state.get()) {
this.cleanUp();
break;
}
Thread.sleep(interval);
processCancelled();
processScheduled();
processTimeouts(index);
index = index == wheel.length - 1 ? 0 : ++index;
} catch (InterruptedException e) {
state.compareAndSet(STARTED, STOPPED);
}
}
}
private void cleanUp() {
//Yet to implement but ideally should drain all timers and expire them gracefully
}
private void processCancelled() {
while (true) {
Participant p = cancelled.poll();
if (p == null) {
break;
}
p.remove();
}
}
private void processScheduled() {
while (true) {
Participant p = scheduled.poll();
if (p == null) {
break;
}
if (p.isCancelled()) {
p.cancel();
continue;
}
int index = (int) (p.deadline % wheel.length);
TimeoutBucket bucket = wheel[index];
bucket.addTimeout(p);
}
}
private void processTimeouts(int index) {
TimeoutBucket bucket = wheel[index];
bucket.expireTimeouts(System.nanoTime());
}
}
class TimeoutBucket {
private Participant head;
private Participant tail;
/**
* Add {@link Timeout} to this bucket.
*/
public void addTimeout(Participant p) {
p.bucket = this;
if (head == null) {
head = tail = p;
} else {
tail.next = p;
p.prev = tail;
tail = p;
}
}
public void expireTimeouts(long deadline) {
Participant p = head;
// process all timeouts
while (p != null) {
Participant next = p.next;
if (p.isStale(deadline)) {
next = remove(p);
p.expire();
} else if (p.isCancelled()) {
next = remove(p);
}
p = next;
}
}
public Participant remove(Participant p) {
Participant next = p.next;
// remove timeout that was either processed or cancelled by updating the linked-list
if (p.prev != null) {
p.prev.next = next;
}
if (p.next != null) {
p.next.prev = p.prev;
}
if (p == head) {
// if timeout is also the tail we need to adjust the entry too
if (p == tail) {
tail = null;
head = null;
} else {
head = next;
}
} else if (p == tail) {
// if the timeout is the tail modify the tail to be the prev node.
tail = p.prev;
}
// null out prev, next and bucket to allow for GC.
p.prev = null;
p.next = null;
p.bucket = null;
return next;
}
}
class Participant implements Timeout {
public long deadline;
public TimeoutBucket bucket;
public Participant next;
public Participant prev;
private final int CREATED = 0;
private final int CANCELLED = 1;
private final int EXPIRED = 2;
private final AtomicInteger state = new AtomicInteger(CREATED);
private final TimerTask task;
private final TimerWheel timer;
public Participant(TimerWheel timer, TimerTask task, long deadline) {
this.timer = timer;
this.task = task;
this.deadline = deadline;
}
public boolean isStale(long time) {
return time >= deadline;
}
public int state() {
return state.get();
}
public long deadline() {
return deadline;
}
@Override
public TimerTask task() {
return task;
}
@Override
public boolean cancel() {
if (!state.compareAndSet(CREATED, CANCELLED)) {
return false;
}
timer.cancelled.add(this);
return true;
}
@Override
public boolean isCancelled() {
return state() == CANCELLED;
}
@Override
public boolean isExpired() {
return state() == EXPIRED;
}
public void expire() {
if (!state.compareAndSet(CREATED, EXPIRED)) {
return;
}
try {
task.run(this);
} catch (Exception t) {
t.printStackTrace();
}
}
public void remove() {
TimeoutBucket b = this.bucket;
if (b != null) {
b.remove(this);
}
}
}
private TimeoutBucket[] createWheel(int wheelSize) {
if (wheelSize <= 0) {
throw new IllegalArgumentException(
"Wheel size must be greater than 0: " + wheelSize);
}
if (wheelSize > 1073741824) {
throw new IllegalArgumentException(
"Wheel size may not be greater than 2^30: " + wheelSize);
}
/**
* Gets the nearest Prime number to ensure even distribution of hashes
* in the wheel.
*/
wheelSize = nearestPrime(wheelSize);
/**
* Initializes the buckets.
*/
TimeoutBucket[] timeoutWheel = new TimeoutBucket[wheelSize];
for (int i = 0; i < timeoutWheel.length; i++) {
timeoutWheel[i] = new TimeoutBucket();
}
return timeoutWheel;
}
private int nearestPrime(int M) {
if (!isPrime(M)) {
M = nearestPrime(++M);
}
return M;
}
private boolean isPrime(int M) {
for (int i = 2; i <= Math.sqrt(M); i++) {
if (M % i == 0) {
return false;
}
}
return true;
}
}
/**
* A handle associated with a {@link TimerTask} that is returned by a
* {@link Timer}.
*/
static interface Timeout {
/**
* Returns the {@link TimerTask} which is associated with this handle.
*
* @return
*/
public abstract TimerTask task();
/**
* Returns {@code true} if and only if the {@link TimerTask} associated with
* this handle has been expired.
*
* @return
*/
public abstract boolean isExpired();
/**
* Returns {@code true} if and only if the {@link TimerTask} associated with
* this handle has been cancelled.
*
* @return
*/
public abstract boolean isCancelled();
/**
* Attempts to cancel the {@link TimerTask} associated with this handle. If
* the task has been executed or cancelled already, it will return with no
* side effect.
*
* @return True if the cancellation completed successfully, otherwise false
*/
public abstract boolean cancel();
}
/**
* A task which is executed after the delay specified with
* {@link Timer#newTimeout(TimerTask, long, TimeUnit)}.
*/
static interface TimerTask {
/**
* Executed after the delay specified with
* {@link Timer#newTimeout(TimerTask, long, TimeUnit)}.
*
* @param timeout a handle which is associated with this task
* @throws java.lang.Exception
*/
public abstract void run(Timeout timeout) throws Exception;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment