Skip to content

Instantly share code, notes, and snippets.

@gcpantazis
Created March 6, 2014 06:56
Show Gist options
  • Save gcpantazis/9383825 to your computer and use it in GitHub Desktop.
Save gcpantazis/9383825 to your computer and use it in GitHub Desktop.
Demonstration on using topic exchanges in AMQP (RabbitMQ) to chunk data at a remote server, and route a response back to the appropriate process. For use in scalable user-facing servers.
'use strict';
var amqp = require('amqp');
var connection = amqp.createConnection({
url: process.env.RABBITMQ_EVENT_BUS
});
// Randomly generate a routing key. Let's say this is for "posts"
var ID = 'posts.' + Math.random().toString(32).substr(2);
connection.on('ready', function() {
var exchange = connection.exchange('x', {
type: 'topic'
});
// This would go on the server.
// ============================
// Let's get the "publisher" queue that the client will post to.
connection.queue('pub', function(pubQueue) {
// Now let's bind that queue to the exchange we made earlier.
// '#' tells the queue to take all messages published, but you use
// other combinations to filter down. See:
// http://www.rabbitmq.com/tutorials/tutorial-five-ruby.html
pubQueue.bind('x', 'posts.#');
// ...And now let's subscribe to it. The server will now get messages any
// number of processes, with `deliveryInfo.routingKey` signifying the point of origin
pubQueue.subscribe(function(message, headers, deliveryInfo) {
// Let's simulate some actual work, and publish to the queue with a routingKey
// that the client process can subscribe to!
exchange.publish('response.' + ID, message.data.toString() + '.yay');
});
});
// This would go on the client.
// ============================
// Each client gets a queue to listen to.
connection.queue('sub.' + ID, function(subQueue) {
// Let's tell this queue to listen to responses from the ID it will send to the server;
// so this will listen for something like `response.posts.3jkl1j43`.
subQueue.bind('x', 'response.' + ID);
// All that's left to do is to do something fun with the response data. Let's log it out to prove that
// this is working.
subQueue.subscribe(function(message, headers, deliveryInfo) {
console.log('got data back!', deliveryInfo.routingKey, message.data.toString());
});
});
// Chunk a bunch of messages at the exchange, with routing data.
setInterval(function() {
exchange.publish(ID, Math.random().toString(32).substr(2));
}, 10);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment