Created
May 17, 2012 18:45
Revisions
-
Peter Magnusson revised this gist
Sep 18, 2012 . 2 changed files with 25 additions and 9 deletions.There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -6,13 +6,28 @@ var rpc = new (require('./amqprpc'))(connection); connection.on("ready", function(){ console.log("ready"); var outstanding=0; //counter of outstanding requests //do a number of requests for(var i=1; i<=10 ;i+=1){ //we are about to make a request, increase counter outstanding += 1; rpc.makeRequest('msg_queue', {foo:'bar', index:outstanding}, function response(err, response){ if(err) console.error(err); else console.log("response", response); //reduce for each timeout or response outstanding-=1; isAllDone(); }); } function isAllDone() { //if no more outstanding then close connection if(outstanding === 0){ connection.end(); } } }); This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -9,7 +9,8 @@ cnn.on('ready', function(){ cnn.queue('msg_queue', function(q){ q.subscribe(function(message, headers, deliveryInfo, m){ util.log(util.format( deliveryInfo.routingKey, message)); //return index sent cnn.publish(m.replyTo, {response:"OK", index:message.index}, { contentType:'application/json', contentEncoding:'utf-8', correlationId:m.correlationId -
Peter Magnusson revised this gist
May 17, 2012 . 2 changed files with 37 additions and 0 deletions.There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,18 @@ //exmaple on how to use amqprpc var amqp = require('amqp'); var connection = amqp.createConnection({host:'127.0.0.1'}); var rpc = new (require('./amqprpc'))(connection); connection.on("ready", function(){ console.log("ready"); rpc.makeRequest('msg_queue', {foo:'bar'}, function response(err, response){ if(err) console.error(err); else console.log("response", response); connection.end(); }); }); This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,19 @@ //super simple rpc server example var amqp = require('amqp') , util = require('util'); var cnn = amqp.createConnection({host:'127.0.0.1'}); cnn.on('ready', function(){ console.log("listening on msg_queue"); cnn.queue('msg_queue', function(q){ q.subscribe(function(message, headers, deliveryInfo, m){ util.log(util.format( deliveryInfo.routingKey, message)); cnn.publish(m.replyTo, {response:"OK"}, { contentType:'application/json', contentEncoding:'utf-8', correlationId:m.correlationId }); }); }); }); -
Peter Magnusson revised this gist
May 17, 2012 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -9,7 +9,7 @@ exports = module.exports = AmqpRpc; function AmqpRpc(connection){ var self = this; this.connection = connection; this.requests = {}; //hash to store request in wait for response this.response_queue = false; //plaseholder for the future queue } -
Peter Magnusson created this gist
May 17, 2012 .There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,79 @@ var amqp = require('amqp') , crypto = require('crypto') var TIMEOUT=2000; //time to wait for response in ms var CONTENT_TYPE='application/json'; var CONTENT_ENCODING='utf-8'; exports = module.exports = AmqpRpc; function AmqpRpc(connection){ var self = this; this.connection = connection; //= typeof(connection) != 'undefined' ? connection : amqp.createConnection(); this.requests = {}; //hash to store request in wait for response this.response_queue = false; //plaseholder for the future queue } AmqpRpc.prototype.makeRequest = function(queue_name, content, callback){ var self = this; //generate a unique correlation id for this call var correlationId = crypto.randomBytes(16).toString('hex'); //create a timeout for what should happen if we don't get a response var tId = setTimeout(function(corr_id){ //if this ever gets called we didn't get a response in a //timely fashion callback(new Error("timeout " + corr_id)); //delete the entry from hash delete self.requests[corr_id]; }, TIMEOUT, correlationId); //create a request entry to store in a hash var entry = { callback:callback, timeout: tId //the id for the timeout so we can clear it }; //put the entry in the hash so we can match the response later self.requests[correlationId]=entry; //make sure we have a response queue self.setupResponseQueue(function(){ //put the request on a queue self.connection.publish(queue_name, content, { correlationId:correlationId, contentType:CONTENT_TYPE, contentEncoding:CONTENT_ENCODING, replyTo:self.response_queue}); }); } AmqpRpc.prototype.setupResponseQueue = function(next){ //don't mess around if we have a queue if(this.response_queue) return next(); var self = this; //create the queue self.connection.queue('', {exclusive:true}, function(q){ //store the name self.response_queue = q.name; //subscribe to messages q.subscribe(function(message, headers, deliveryInfo, m){ //get the correlationId var correlationId = m.correlationId; //is it a response to a pending request if(correlationId in self.requests){ //retreive the request entry var entry = self.requests[correlationId]; //make sure we don't timeout by clearing it clearTimeout(entry.timeout); //delete the entry from hash delete self.requests[correlationId]; //callback, no err entry.callback(null, message); } }); return next(); }); }