Created
February 12, 2015 09:53
-
-
Save wuyongzheng/b7efb2f97d9650f26470 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* Discretionary Fair Queue | |
A DFQ tries to achieve the same criterias as in WFQ: | |
1. Fairness: equivelent to byte-level round-robin | |
2. Traffic Shaping: throughput and burst guarantee | |
Instead of controlling _when_ to send as in WFQ, DFQ decides whether to | |
send. When a sender wants to send a packet, it first calls | |
Flow.send(size). DFQ either returns "Yes. You can send now" or "No. Check | |
with me again after t seconds." | |
*/ | |
import java.util.*; | |
public class DiscretionaryFairQueue { | |
private int nextFlowID = 1; | |
private int throughput; | |
private int burst; | |
private final LinkedList<Flow> flows = new LinkedList<Flow>(); | |
private long lastTS; // as in System.currentTimeMillis() | |
private int tokens; // in bytes | |
/* @param throughput bytes per second | |
* @param burst in bytes */ | |
public DiscretionaryFairQueue (int throughput, int burst) | |
{ | |
assert throughput > 0 && throughput < 500000000; | |
assert burst > 1500 && burst < 500000000; | |
this.throughput = throughput; | |
this.burst = burst; | |
} | |
public int getThroughput () | |
{ | |
return throughput; | |
} | |
public int getBurst () | |
{ | |
return burst; | |
} | |
public synchronized void setThroughputBurst (int t, int b) | |
{ | |
throughput = t; | |
burst = b; | |
} | |
public synchronized Flow createFlow () | |
{ | |
Flow flow = new Flow(); | |
flows.add(flow); | |
return flow; | |
} | |
public class Flow { | |
private final int flowID; | |
private int queued; // the number of bytes a flow wants to send but asked to wait. | |
/* When a flow sends M bytes, each of the other N flow waiting flows gets M/N karma, upto the queued bytes. | |
0 <= karma <= queued | |
A flow with smaller (queued-karma) should send earlier. | |
*/ | |
private int karma; | |
private Flow () | |
{ | |
flowID = nextFlowID ++; | |
} | |
/** | |
* Check whether this flow can send size bytes. | |
* @param size number of bytes to send. | |
* @param minSize minimal number of bytes to send. minSize should not be larger than size. minSize should not be larger than burst. | |
* @return n > 0 indicating that the caller should send n bytes immidiately. guaranteed minSize < n < size. | |
* n < 0 indicating that the caller should wait -n milliseconds and check again | |
*/ | |
public int send (int size, int minSize) | |
{ | |
assert 0 < minSize && minSize <= size && minSize <= burst; | |
synchronized (DiscretionaryFairQueue.this) { | |
long time = System.currentTimeMillis(); | |
tokens += (int)(time - lastTS) * throughput / 1000; | |
lastTS = time; | |
queued = size; | |
if (tokens < minSize) | |
return -(size - tokens) * 1000 / throughput; | |
// count queued packets before me | |
int earlierQueued = 0; | |
int numOtherFlows = 0; | |
for (Flow f : flows) { | |
if (f == this) | |
continue; | |
if (f.queued > f.karma) | |
numOtherFlows ++; | |
if (f.queued-f.karma < queued-karma) | |
earlierQueued += f.queued; | |
} | |
if (tokens - earlierQueued < minSize) | |
return -(size + earlierQueued - tokens) * 1000 / throughput; | |
// Now, OK to send | |
int toSend = tokens - earlierQueued < size ? tokens - earlierQueued : size; | |
assert toSend >= minSize; | |
for (Flow f : flows) | |
if (f.queued > f.karma) | |
f.karma += toSend / numOtherFlows; | |
tokens -= toSend; | |
queued = size - toSend; | |
karma -= toSend; | |
if (karma < 0) karma = 0; | |
if (karma > queued) karma = queued; | |
return toSend; | |
} | |
} | |
/** | |
* Force sending size bytes regardless of burst constraints. | |
* Throughput and fairness are still ensured in the long term. | |
*/ | |
public void forceSend (int size) | |
{ | |
synchronized (DiscretionaryFairQueue.this) { | |
tokens -= size; | |
queued = karma = 0; | |
ArrayList<Flow> others = new ArrayList<Flow>(); | |
for (Flow f : flows) | |
if (f.queued > f.karma) | |
others.add(f); | |
for (Flow f : others) | |
f.karma += size / others.size(); | |
} | |
} | |
public void close () | |
{ | |
synchronized (DiscretionaryFairQueue.this) { | |
flows.remove(this); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment