public
Last active

A simple throttled blocking-queue that returns elements at a constant rate (M elements in N time)

  • Download Gist
ThrottledQueue.java
Java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
 
/**
* A simple throttled blocking-queue that returns elements at a constant rate (M
* elements in N time)
*
* @author Kamran Zafar
*
*/
public class ThrottledQueue {
// M elements
private final int mElements;
 
// N Time in milliseconds
private final long nTime;
 
// delay per queue element from last fetch
private final long delayPerElement;
 
// delay offset in milliseconds
private long delayOffset = System.currentTimeMillis();
 
// Reference start time
private final long startTime = System.nanoTime();
 
private final DelayQueue<DelayedElement> queue = new DelayQueue<DelayedElement>();
 
public ThrottledQueue(int mElements, long nTime) {
this.mElements = mElements;
this.nTime = nTime;
 
// calculate delay per element
delayPerElement = (long) Math.ceil( nTime / mElements );
}
 
/**
* Adds an element to the queue
*
* @param item
*/
public void add(Object element) {
queue.put( new DelayedElement( element ) );
}
 
/**
* Blocks till delay expires and sets the next offset
*
* @return Object
*/
public synchronized Object get() {
try {
Object obj = queue.take().getElement();
 
// set next offset
delayOffset = System.currentTimeMillis();
 
return obj;
} catch (InterruptedException e) {
e.printStackTrace();
}
 
return null;
}
 
public boolean isEmpty() {
return queue.isEmpty();
}
 
public int size() {
return queue.size();
}
 
public int getMElements() {
return mElements;
}
 
public long getNTime() {
return nTime;
}
 
public long getDelayPerElement() {
return delayPerElement;
}
 
/**
* Start time in nanoseconds; reference point
*
* @return
*/
public long getStartTime() {
return startTime;
}
 
private class DelayedElement implements Delayed {
private final Object element;
private final long insertTime; // relative to startTime
 
public DelayedElement(Object element) {
this.element = element;
 
// elapsed time in nano seconds
insertTime = System.nanoTime() - startTime;
}
 
public Object getElement() {
return element;
}
 
/**
* Returns < 0 if delay expires
*
* @see java.util.concurrent.Delayed#getDelay(java.util.concurrent.TimeUnit)
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert( delayOffset + delayPerElement - System.currentTimeMillis(), TimeUnit.MILLISECONDS );
}
 
/**
* Compares elements in order to return them in the same order they were
* inserted
*
* @see java.lang.Comparable#compareTo(java.lang.Object)
*/
@Override
public int compareTo(Delayed o) {
DelayedElement de = (DelayedElement) o;
if (insertTime < de.getInsertTime())
return -1;
if (insertTime > de.getInsertTime())
return 1;
 
return 0;
}
 
public long getInsertTime() {
return insertTime;
}
}
 
// Test
public static void main(String[] args) throws Exception {
// Create a throttled queue that returns 2 elements in 1 second
final ThrottledQueue q = new ThrottledQueue( 2, 1000 );
 
// some elements, e.g. SMS records
q.add( "element 1" );
q.add( "element 2" );
q.add( "element 3" );
q.add( "element 4" );
q.add( "element 5" );
q.add( "element 6" );
q.add( "element 7" );
q.add( "element 8" );
q.add( "element 9" );
q.add( "element 10" );
 
System.out.println( "Delay per element: " + q.getDelayPerElement() );
 
// A thread that consumes elements in the queue
Thread consumer = new Thread( new Runnable() {
@Override
public void run() {
while (!q.isEmpty()) {
// Do something. e.g. send SMS
System.out.println( new SimpleDateFormat( "dd/MM/yyyy HH:mm:ss:S" ).format( new Date( System
.currentTimeMillis() ) ) + " - " + q.get() );
}
}
} );
 
consumer.start();
consumer.join();
}
}

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.