Skip to content

Instantly share code, notes, and snippets.

@btd
Created March 6, 2022 20:41
Show Gist options
  • Save btd/743cc268ccaee0b7a8a68c25fe393d8a to your computer and use it in GitHub Desktop.
Save btd/743cc268ccaee0b7a8a68c25fe393d8a to your computer and use it in GitHub Desktop.
RabbitMq node.js token bucket example
import amqp from "amqplib";
const delay = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
const connection = await amqp.connect("amqp://localhost");
const channel = await connection.createChannel();
var queue = "task_queue";
var msg = "Hello World!";
await channel.assertQueue(queue, {
durable: true,
});
while (true) {
channel.sendToQueue(queue, Buffer.from(msg), {
persistent: true,
});
console.log(" [x] Sent '%s'", msg);
await delay(100);
}
import amqp from "amqplib";
const connection = await amqp.connect("amqp://localhost");
const channel = await connection.createChannel();
var limit = 3;
var bucketQueue = `tokens${limit}`;
await channel.assertQueue(bucketQueue, {
durable: true,
messageTtl: 1000,
maxLength: limit,
});
function writeToken() {
channel.sendToQueue(bucketQueue, Buffer.from(new Date().toISOString()), {
persistent: true,
});
setTimeout(writeToken, 1000 / limit);
}
console.log("Starting token bucket publisher");
writeToken();
import amqp from "amqplib";
const connection = await amqp.connect("amqp://localhost");
const channel = await connection.createChannel();
var queue = "task_queue";
var bucketQueue = "tokens3";
await channel.assertQueue(queue, { durable: true });
await channel.prefetch(1);
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);
let c = 0;
const consumer = await channel.consume(
queue,
async (msg) => {
const token = await channel.get(bucketQueue);
if (token) {
console.log(" [x] Received %s", msg.content.toString(), new Date());
channel.ack(msg);
} else {
//console.log(" [x] Bucket is full, rejecting message");
channel.nack(msg);
}
},
{ noAck: false }
);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment