Created
March 6, 2014 06:56
-
-
Save gcpantazis/9383825 to your computer and use it in GitHub Desktop.
Demonstration on using topic exchanges in AMQP (RabbitMQ) to chunk data at a remote server, and route a response back to the appropriate process. For use in scalable user-facing servers.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; | |
var amqp = require('amqp'); | |
var connection = amqp.createConnection({ | |
url: process.env.RABBITMQ_EVENT_BUS | |
}); | |
// Randomly generate a routing key. Let's say this is for "posts" | |
var ID = 'posts.' + Math.random().toString(32).substr(2); | |
connection.on('ready', function() { | |
var exchange = connection.exchange('x', { | |
type: 'topic' | |
}); | |
// This would go on the server. | |
// ============================ | |
// Let's get the "publisher" queue that the client will post to. | |
connection.queue('pub', function(pubQueue) { | |
// Now let's bind that queue to the exchange we made earlier. | |
// '#' tells the queue to take all messages published, but you use | |
// other combinations to filter down. See: | |
// http://www.rabbitmq.com/tutorials/tutorial-five-ruby.html | |
pubQueue.bind('x', 'posts.#'); | |
// ...And now let's subscribe to it. The server will now get messages any | |
// number of processes, with `deliveryInfo.routingKey` signifying the point of origin | |
pubQueue.subscribe(function(message, headers, deliveryInfo) { | |
// Let's simulate some actual work, and publish to the queue with a routingKey | |
// that the client process can subscribe to! | |
exchange.publish('response.' + ID, message.data.toString() + '.yay'); | |
}); | |
}); | |
// This would go on the client. | |
// ============================ | |
// Each client gets a queue to listen to. | |
connection.queue('sub.' + ID, function(subQueue) { | |
// Let's tell this queue to listen to responses from the ID it will send to the server; | |
// so this will listen for something like `response.posts.3jkl1j43`. | |
subQueue.bind('x', 'response.' + ID); | |
// All that's left to do is to do something fun with the response data. Let's log it out to prove that | |
// this is working. | |
subQueue.subscribe(function(message, headers, deliveryInfo) { | |
console.log('got data back!', deliveryInfo.routingKey, message.data.toString()); | |
}); | |
}); | |
// Chunk a bunch of messages at the exchange, with routing data. | |
setInterval(function() { | |
exchange.publish(ID, Math.random().toString(32).substr(2)); | |
}, 10); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment