Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Using the CoDel Queueing Consumer
package com.rabbitmq.examples;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumerCoDel.Delivery;
import com.rabbitmq.client.QueueingConsumerCoDel;
import com.rabbitmq.client.ShutdownSignalException;
public class CoDelConsumer {
public static void main(String[] args) throws IOException,
ShutdownSignalException, ConsumerCancelledException,
InterruptedException {
final Connection conn = new ConnectionFactory().newConnection();
final Channel chan = conn.createChannel();
final QueueingConsumerCoDel consumer = new QueueingConsumerCoDel(chan,
true, 10, 2);
//final QueueingConsumer consumer = new QueueingConsumer(chan);
final String queue = chan.queueDeclare("myqueue", false, false, true,
null).getQueue();
//chan.basicQos(10);
chan.basicConsume(queue, consumer);
long[] sojourns = new long[1000];
int idx = 0, sum = 0;
long now = System.currentTimeMillis(), then = now;
while (true) {
Delivery d = consumer.nextDelivery();
sum -= sojourns[idx];
sojourns[idx] = d.sojournTime;
sum += sojourns[idx];
idx += 1;
if (idx == sojourns.length) {
idx = 0;
now = System.currentTimeMillis();
System.out.println(""
+ (sojourns.length / ((now - then) / 1000.0D)) + "Hz: "
+ sum / (double) sojourns.length);
then = now;
}
Thread.sleep(2);
chan.basicAck(d.getEnvelope().getDeliveryTag(), false);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment