Skip to content

Instantly share code, notes, and snippets.

@wuyongzheng
Created February 13, 2015 08:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wuyongzheng/2e16797d24938b50a4f0 to your computer and use it in GitHub Desktop.
Save wuyongzheng/2e16797d24938b50a4f0 to your computer and use it in GitHub Desktop.
/* Discretionary Round Robin
A DFQ tries to achieve the same criterias as in a Round Robin Queue
1. No starvation.
2. Traffic Shaping: throughput and burst guarantee
Instead of controlling _when_ to send as in normal traffic shaping queue,
DRR decides whether to send. When a sender wants to send a packet, it first
calls Flow.send(size). DRR either returns "Yes. You can send now" or "No.
Check with me again after t seconds."
*/
//import java.util.*;
public class DiscretionaryRoundRobin {
private int throughput; // byte per second
private int burst; // bytes
private final Flow dummyHead; // dummyHead.next: head of queue
private long lastTS; // as in System.currentTimeMillis()
private int tokens; // in bytes
public DiscretionaryRoundRobin (int throughput, int burst, int initTokens)
{
assert throughput > 0 && throughput < 500000000;
assert burst > 1500 && burst < 500000000;
this.throughput = throughput;
this.burst = burst;
tokens = initTokens;
lastTS = System.currentTimeMillis();
dummyHead = new Flow();
dummyHead.next = dummyHead.prev = dummyHead;
}
public int getThroughput () { return throughput; }
public int getBurst () { return burst; }
public synchronized void setThroughputBurst (int t, int b)
{
throughput = t;
burst = b;
}
public synchronized Flow createFlow ()
{
Flow flow = new Flow();
enqueue(flow, false);
return flow;
}
private void enqueue (Flow flow, boolean head)
{
assert flow.prev == null && flow.next == null;
assert Thread.holdsLock(this);
if (head) {
flow.next = dummyHead.next;
flow.prev = dummyHead;
dummyHead.next = flow;
flow.next.prev = flow;
} else {
flow.next = dummyHead;
flow.prev = dummyHead.prev;
dummyHead.prev = flow;
flow.prev.next = flow;
}
}
public class Flow {
private int queued; // 0 <= queued <= burst
private Flow prev, next;
/**
* Check whether this flow can send size bytes.
* @param size number of bytes to send.
* @param minSize minimal number of bytes to send. minSize should not be larger than size. minSize should not be larger than burst.
* @return n &gt; 0 indicating that the caller should send n bytes immidiately. guaranteed minSize &lt; n &lt; size.
* n &lt; 0 indicating that the caller should wait -n milliseconds and check again
*/
public int send (int size, int minSize)
{
synchronized (DiscretionaryRoundRobin.this) {
assert 0 < minSize && minSize <= size && minSize <= burst;
long time = System.currentTimeMillis();
tokens += (int)(time - lastTS) * throughput / 1000;
lastTS = time;
queued = size < burst ? size : burst;
int trafficBeforeMe = 0;
for (Flow f = dummyHead.next; f != this; f = f.next) {
assert f != dummyHead;
trafficBeforeMe += f.queued;
}
int retval;
if (tokens < trafficBeforeMe + minSize) {
retval = -(trafficBeforeMe + queued - tokens) * 1000 / throughput - 1;
} else {
retval = tokens - trafficBeforeMe < queued ? tokens - trafficBeforeMe : queued;
assert minSize <= retval && retval <= size && retval <= burst;
if (tokens > trafficBeforeMe + burst)
tokens = trafficBeforeMe + burst - retval;
else
tokens -= retval;
queued = size - retval < burst ? size - retval : burst;
detach();
enqueue(this, false);
}
return retval;
}
}
/**
* Force sending size bytes regardless of burst constraints.
* Throughput is still ensured in the long term.
*/
public void forceSend (int size)
{
synchronized (DiscretionaryRoundRobin.this) {
tokens -= size;
queued = 0;
detach();
enqueue(this, false);
}
}
public void close ()
{
synchronized (DiscretionaryRoundRobin.this) {
detach();
}
}
private void detach ()
{
assert prev != null && next != null;
assert Thread.holdsLock(DiscretionaryRoundRobin.this);
next.prev = prev;
prev.next = next;
prev = next = null;
}
}
}
public class RRTest extends Thread {
private static final DiscretionaryRoundRobin drr = new DiscretionaryRoundRobin(200*1024, 65536, 0);
public static void main (String [] args)
{
int flowid = 1;
new RRTest(1400, 1400, 100, 500, drr.createFlow(), flowid++).start();
new RRTest(64, 64, 1000, 1000, drr.createFlow(), flowid++).start();
new RRTest(60000, 120000, 1000, 2000, drr.createFlow(), flowid++).start();
new RRTest(60000, 120000, 100, 200, drr.createFlow(), flowid++).start();
}
private final int minS;
private final int maxS;
private final int minT;
private final int maxT;
private final DiscretionaryRoundRobin.Flow flow;
private final int id;
public RRTest (int minS, int maxS, int minT, int maxT, DiscretionaryRoundRobin.Flow flow, int id)
{
this.minS = minS;
this.maxS = maxS;
this.minT = minT;
this.maxT = maxT;
this.flow = flow;
this.id = id;
}
public void run ()
{
while (true) {
int size = (int)(Math.random() * (maxS - minS) + minS);
int time = (int)(Math.random() * (maxT - minT) + minT);
int minSize = size < drr.getBurst() ? size : drr.getBurst();
int retval = flow.send(size, minSize);
System.out.println(id + "(" + size + ", " + minSize + ") " + retval);
try {
if (retval < 0) {
Thread.sleep(-retval);
} else {
Thread.sleep(time);
}
} catch (InterruptedException x) {}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment