Skip to content

Instantly share code, notes, and snippets.

@homerquan
Forked from tlhunter/rmq-pub.js
Last active August 29, 2015 14:19
Show Gist options
  • Save homerquan/77738740c5835e3f93f9 to your computer and use it in GitHub Desktop.
Save homerquan/77738740c5835e3f93f9 to your computer and use it in GitHub Desktop.
#!/usr/bin/env node
var amqp = require('amqplib');
var exchange_name = 'pubsub';
amqp.connect('amqp://localhost').then(function(conn) {
process.once('SIGINT', function() { conn.close(); });
return conn.createChannel().then(function(channel) {
var chok = channel.assertExchange(exchange_name, 'direct', {
durable: true
});
chok.then(function() {
for (var i = 1; i < 100000; i++) {
sender(i);
}
return sender('FIN');
});
return chok;
function sender(i) {
channel.publish(exchange_name, '', new Buffer(JSON.stringify({
message: i
})));
}
});
}).then(null, console.warn);
#!/usr/bin/env node
var amqp = require('amqplib');
var os = require('os');
var queue_name = process.pid + '@' + os.hostname();
var exchange_name = 'pubsub';
amqp.connect('amqp://localhost').then(function(conn) {
process.once('SIGINT', function() { conn.close(); });
return conn.createChannel().then(function(ch) {
var ok = ch.assertExchange(exchange_name, 'direct', {
durable: true
});
ok = ok.then(function() {
return ch.assertQueue(queue_name, {
exclusive: true
});
});
ok = ok.then(function(qok) {
return ch.bindQueue(qok.queue, exchange_name, '').then(function() {
return qok.queue;
});
});
ok = ok.then(function(queue) {
return ch.consume(queue, logMessage, {
noAck: false
});
});
return ok.then(function() {
console.log('Waiting for messages delivered to queue: ' + queue_name);
});
function logMessage(msg) {
console.log(msg.content.toString());
ch.ack(msg);
}
});
}).then(null, console.warn);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment