Skip to content

Instantly share code, notes, and snippets.

@wuyongzheng
Created February 12, 2015 09:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wuyongzheng/b7efb2f97d9650f26470 to your computer and use it in GitHub Desktop.
Save wuyongzheng/b7efb2f97d9650f26470 to your computer and use it in GitHub Desktop.
/* Discretionary Fair Queue
A DFQ tries to achieve the same criterias as in WFQ:
1. Fairness: equivelent to byte-level round-robin
2. Traffic Shaping: throughput and burst guarantee
Instead of controlling _when_ to send as in WFQ, DFQ decides whether to
send. When a sender wants to send a packet, it first calls
Flow.send(size). DFQ either returns "Yes. You can send now" or "No. Check
with me again after t seconds."
*/
import java.util.*;
public class DiscretionaryFairQueue {
private int nextFlowID = 1;
private int throughput;
private int burst;
private final LinkedList<Flow> flows = new LinkedList<Flow>();
private long lastTS; // as in System.currentTimeMillis()
private int tokens; // in bytes
/* @param throughput bytes per second
* @param burst in bytes */
public DiscretionaryFairQueue (int throughput, int burst)
{
assert throughput > 0 && throughput < 500000000;
assert burst > 1500 && burst < 500000000;
this.throughput = throughput;
this.burst = burst;
}
public int getThroughput ()
{
return throughput;
}
public int getBurst ()
{
return burst;
}
public synchronized void setThroughputBurst (int t, int b)
{
throughput = t;
burst = b;
}
public synchronized Flow createFlow ()
{
Flow flow = new Flow();
flows.add(flow);
return flow;
}
public class Flow {
private final int flowID;
private int queued; // the number of bytes a flow wants to send but asked to wait.
/* When a flow sends M bytes, each of the other N flow waiting flows gets M/N karma, upto the queued bytes.
0 <= karma <= queued
A flow with smaller (queued-karma) should send earlier.
*/
private int karma;
private Flow ()
{
flowID = nextFlowID ++;
}
/**
* Check whether this flow can send size bytes.
* @param size number of bytes to send.
* @param minSize minimal number of bytes to send. minSize should not be larger than size. minSize should not be larger than burst.
* @return n &gt; 0 indicating that the caller should send n bytes immidiately. guaranteed minSize &lt; n &lt; size.
* n &lt; 0 indicating that the caller should wait -n milliseconds and check again
*/
public int send (int size, int minSize)
{
assert 0 < minSize && minSize <= size && minSize <= burst;
synchronized (DiscretionaryFairQueue.this) {
long time = System.currentTimeMillis();
tokens += (int)(time - lastTS) * throughput / 1000;
lastTS = time;
queued = size;
if (tokens < minSize)
return -(size - tokens) * 1000 / throughput;
// count queued packets before me
int earlierQueued = 0;
int numOtherFlows = 0;
for (Flow f : flows) {
if (f == this)
continue;
if (f.queued > f.karma)
numOtherFlows ++;
if (f.queued-f.karma < queued-karma)
earlierQueued += f.queued;
}
if (tokens - earlierQueued < minSize)
return -(size + earlierQueued - tokens) * 1000 / throughput;
// Now, OK to send
int toSend = tokens - earlierQueued < size ? tokens - earlierQueued : size;
assert toSend >= minSize;
for (Flow f : flows)
if (f.queued > f.karma)
f.karma += toSend / numOtherFlows;
tokens -= toSend;
queued = size - toSend;
karma -= toSend;
if (karma < 0) karma = 0;
if (karma > queued) karma = queued;
return toSend;
}
}
/**
* Force sending size bytes regardless of burst constraints.
* Throughput and fairness are still ensured in the long term.
*/
public void forceSend (int size)
{
synchronized (DiscretionaryFairQueue.this) {
tokens -= size;
queued = karma = 0;
ArrayList<Flow> others = new ArrayList<Flow>();
for (Flow f : flows)
if (f.queued > f.karma)
others.add(f);
for (Flow f : others)
f.karma += size / others.size();
}
}
public void close ()
{
synchronized (DiscretionaryFairQueue.this) {
flows.remove(this);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment