Created
February 13, 2015 08:30
-
-
Save wuyongzheng/2e16797d24938b50a4f0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* Discretionary Round Robin | |
A DFQ tries to achieve the same criterias as in a Round Robin Queue | |
1. No starvation. | |
2. Traffic Shaping: throughput and burst guarantee | |
Instead of controlling _when_ to send as in normal traffic shaping queue, | |
DRR decides whether to send. When a sender wants to send a packet, it first | |
calls Flow.send(size). DRR either returns "Yes. You can send now" or "No. | |
Check with me again after t seconds." | |
*/ | |
//import java.util.*; | |
public class DiscretionaryRoundRobin { | |
private int throughput; // byte per second | |
private int burst; // bytes | |
private final Flow dummyHead; // dummyHead.next: head of queue | |
private long lastTS; // as in System.currentTimeMillis() | |
private int tokens; // in bytes | |
public DiscretionaryRoundRobin (int throughput, int burst, int initTokens) | |
{ | |
assert throughput > 0 && throughput < 500000000; | |
assert burst > 1500 && burst < 500000000; | |
this.throughput = throughput; | |
this.burst = burst; | |
tokens = initTokens; | |
lastTS = System.currentTimeMillis(); | |
dummyHead = new Flow(); | |
dummyHead.next = dummyHead.prev = dummyHead; | |
} | |
public int getThroughput () { return throughput; } | |
public int getBurst () { return burst; } | |
public synchronized void setThroughputBurst (int t, int b) | |
{ | |
throughput = t; | |
burst = b; | |
} | |
public synchronized Flow createFlow () | |
{ | |
Flow flow = new Flow(); | |
enqueue(flow, false); | |
return flow; | |
} | |
private void enqueue (Flow flow, boolean head) | |
{ | |
assert flow.prev == null && flow.next == null; | |
assert Thread.holdsLock(this); | |
if (head) { | |
flow.next = dummyHead.next; | |
flow.prev = dummyHead; | |
dummyHead.next = flow; | |
flow.next.prev = flow; | |
} else { | |
flow.next = dummyHead; | |
flow.prev = dummyHead.prev; | |
dummyHead.prev = flow; | |
flow.prev.next = flow; | |
} | |
} | |
public class Flow { | |
private int queued; // 0 <= queued <= burst | |
private Flow prev, next; | |
/** | |
* Check whether this flow can send size bytes. | |
* @param size number of bytes to send. | |
* @param minSize minimal number of bytes to send. minSize should not be larger than size. minSize should not be larger than burst. | |
* @return n > 0 indicating that the caller should send n bytes immidiately. guaranteed minSize < n < size. | |
* n < 0 indicating that the caller should wait -n milliseconds and check again | |
*/ | |
public int send (int size, int minSize) | |
{ | |
synchronized (DiscretionaryRoundRobin.this) { | |
assert 0 < minSize && minSize <= size && minSize <= burst; | |
long time = System.currentTimeMillis(); | |
tokens += (int)(time - lastTS) * throughput / 1000; | |
lastTS = time; | |
queued = size < burst ? size : burst; | |
int trafficBeforeMe = 0; | |
for (Flow f = dummyHead.next; f != this; f = f.next) { | |
assert f != dummyHead; | |
trafficBeforeMe += f.queued; | |
} | |
int retval; | |
if (tokens < trafficBeforeMe + minSize) { | |
retval = -(trafficBeforeMe + queued - tokens) * 1000 / throughput - 1; | |
} else { | |
retval = tokens - trafficBeforeMe < queued ? tokens - trafficBeforeMe : queued; | |
assert minSize <= retval && retval <= size && retval <= burst; | |
if (tokens > trafficBeforeMe + burst) | |
tokens = trafficBeforeMe + burst - retval; | |
else | |
tokens -= retval; | |
queued = size - retval < burst ? size - retval : burst; | |
detach(); | |
enqueue(this, false); | |
} | |
return retval; | |
} | |
} | |
/** | |
* Force sending size bytes regardless of burst constraints. | |
* Throughput is still ensured in the long term. | |
*/ | |
public void forceSend (int size) | |
{ | |
synchronized (DiscretionaryRoundRobin.this) { | |
tokens -= size; | |
queued = 0; | |
detach(); | |
enqueue(this, false); | |
} | |
} | |
public void close () | |
{ | |
synchronized (DiscretionaryRoundRobin.this) { | |
detach(); | |
} | |
} | |
private void detach () | |
{ | |
assert prev != null && next != null; | |
assert Thread.holdsLock(DiscretionaryRoundRobin.this); | |
next.prev = prev; | |
prev.next = next; | |
prev = next = null; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class RRTest extends Thread { | |
private static final DiscretionaryRoundRobin drr = new DiscretionaryRoundRobin(200*1024, 65536, 0); | |
public static void main (String [] args) | |
{ | |
int flowid = 1; | |
new RRTest(1400, 1400, 100, 500, drr.createFlow(), flowid++).start(); | |
new RRTest(64, 64, 1000, 1000, drr.createFlow(), flowid++).start(); | |
new RRTest(60000, 120000, 1000, 2000, drr.createFlow(), flowid++).start(); | |
new RRTest(60000, 120000, 100, 200, drr.createFlow(), flowid++).start(); | |
} | |
private final int minS; | |
private final int maxS; | |
private final int minT; | |
private final int maxT; | |
private final DiscretionaryRoundRobin.Flow flow; | |
private final int id; | |
public RRTest (int minS, int maxS, int minT, int maxT, DiscretionaryRoundRobin.Flow flow, int id) | |
{ | |
this.minS = minS; | |
this.maxS = maxS; | |
this.minT = minT; | |
this.maxT = maxT; | |
this.flow = flow; | |
this.id = id; | |
} | |
public void run () | |
{ | |
while (true) { | |
int size = (int)(Math.random() * (maxS - minS) + minS); | |
int time = (int)(Math.random() * (maxT - minT) + minT); | |
int minSize = size < drr.getBurst() ? size : drr.getBurst(); | |
int retval = flow.send(size, minSize); | |
System.out.println(id + "(" + size + ", " + minSize + ") " + retval); | |
try { | |
if (retval < 0) { | |
Thread.sleep(-retval); | |
} else { | |
Thread.sleep(time); | |
} | |
} catch (InterruptedException x) {} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment