Skip to content

Instantly share code, notes, and snippets.

@wkgcass
Last active January 10, 2021 12:07
Show Gist options
  • Save wkgcass/929f9bee30b946dd830e2a4a1a669403 to your computer and use it in GitHub Desktop.
Save wkgcass/929f9bee30b946dd830e2a4a1a669403 to your computer and use it in GitHub Desktop.
TimeWheel example (TODO)
package vproxybase.util.time.timewheel;
import vproxybase.util.LogType;
import vproxybase.util.Logger;
import vproxybase.util.time.TimeElem;
public class TimeElemImpl<T> implements TimeElem<T> {
final long timeoutTs;
private final T value;
boolean polled = false;
boolean canceled = false;
public TimeElemImpl(long timeoutTs, T value) {
this.timeoutTs = timeoutTs;
this.value = value;
}
@Override
public T get() {
return value;
}
@Override
public void removeSelf() {
if (polled) {
Logger.warn(LogType.IMPROPER_USE, "canceling a time event while it's already handled");
return;
}
if (canceled) {
return;
}
canceled = true;
}
}
package vproxybase.util.time.timewheel;
import java.util.LinkedList;
import java.util.List;
public class TimeoutQueue<E> {
private final LinkedList<TimeElemImpl<E>> queue = new LinkedList<>();
public void add(TimeElemImpl<E> e) {
if (e.canceled) {
return;
}
queue.add(e);
}
public E poll() {
TimeElemImpl<E> e;
while ((e = queue.pollFirst()) != null) {
if (e.canceled) {
continue;
}
e.polled = true;
return e.get();
}
return null;
}
public void addAll(List<TimeElemImpl<E>> list) {
for (var e : list) {
add(e);
}
}
public boolean isEmpty() {
// TODO
}
}
package vproxybase.util.time.timewheel;
import vproxybase.util.time.TimeElem;
import vproxybase.util.time.TimeQueue;
import java.util.PriorityQueue;
public class TimeWheel<T> implements TimeQueue<T> {
private final TimeoutQueue<T> timeoutQueue = new TimeoutQueue<>();
private final TimeWheelLayer<T>[] layers;
private final PriorityQueue<TimeElemImpl<T>> backlogQueue = new PriorityQueue<>((a, b) -> (int) (a.timeoutTs - b.timeoutTs));
private long last; // last timestamp
public TimeWheel(int[] capacityOfEachLayer, int layer0Tick) {
//noinspection unchecked
layers = new TimeWheelLayer[capacityOfEachLayer.length];
int tick = layer0Tick;
for (int layer = 0; layer < capacityOfEachLayer.length; ++layer) {
int cap = capacityOfEachLayer[layer];
layers[layer] = new TimeWheelLayer<>(tick, layer, cap);
tick *= cap;
}
}
@Override
public TimeElem<T> add(long current, int timeout, T elem) {
update(current);
// TODO
}
@Override
public T poll() {
T t = timeoutQueue.poll();
if (t != null) return t;
// TODO
}
@Override
public boolean isEmpty() {
// TODO
}
@Override
public int nextTime(long current) {
update(current);
if (!timeoutQueue.isEmpty()) return 0; // may directly poll
// TODO
}
private void update(long current) {
if (last == current) {
return; // no need to update
}
if (last == 0) {
last = current;
for (var layer : layers) {
layer.curSlotTs = current;
}
return; // not initialized yet, no need to process elements
}
int updatedLayer; // the layers whose index <= updatedLayer are updated
for (updatedLayer = 0; updatedLayer < layers.length; ++updatedLayer) {
var layer = layers[updatedLayer];
int delta = (int) (current - layer.curSlotTs);
int off = layer.off + delta / layer.tick;
layer.flush(layer.off, off, timeoutQueue);
layer.curSlotTs += (off - layer.off) * layer.tick;
if (off < layer.cap) {
layer.off = off;
break; // still within this layer after updating
} else {
layer.off = off % layer.cap;
}
}
if (updatedLayer == layers.length) {
// all layers are flushed
// handle those added into wrapped slots in the highest layer
var layer = layers[layers.length - 1];
layer.flush(0, layer.off, timeoutQueue);
}
// otherwise
flatten(updatedLayer);
}
private void flatten(int updatedLayer) {
// extract time elements from higher layer to lower layer
for (int i = updatedLayer; i > 0; --i) {
var layer = layers[updatedLayer];
var lower = layers[updatedLayer - 1];
var elements = layer.slots[layer.off];
for (var e : elements) {
// release to lower layer
int delta = (int) (e.timeoutTs - lower.curSlotTs);
lower.store(lower.off + delta / lower.tick, e);
}
elements.clear();
}
// for layer0, current tick events also should be flushed
var layer0 = layers[0];
layer0.flush(layer0.off, layer0.off + 1, timeoutQueue);
}
}
package vproxybase.util.time.timewheel;
import java.util.LinkedList;
import java.util.List;
public class TimeWheelLayer<T> {
int off = 0; // current offset
long curSlotTs; // timestamp for current slot
public final int tick; // time interval between buckets
public final int layer; // layer of this wheel, starting from 0 with the most little tick
public final int cap; // how many elements this layer can hold
public final List<TimeElemImpl<T>>[] slots;
public TimeWheelLayer(int tick, int layer, int cap) {
this.tick = tick;
this.layer = layer;
this.cap = cap;
//noinspection unchecked
slots = new List[cap];
for (int i = 0; i < cap; ++i) {
slots[i] = new LinkedList<>();
}
}
public void store(int index, TimeElemImpl<T> elem) {
slots[index].add(elem);
}
public void flush(int from, int to, TimeoutQueue<T> timeoutQueue) {
for (int i = from; i < cap && i < to; ++i) {
timeoutQueue.addAll(slots[i]);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment