Created
May 17, 2012 18:45
-
-
Save kmpm/2720846 to your computer and use it in GitHub Desktop.
RPC over RabbitMQ/AMQP
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var amqp = require('amqp') | |
, crypto = require('crypto') | |
var TIMEOUT=2000; //time to wait for response in ms | |
var CONTENT_TYPE='application/json'; | |
var CONTENT_ENCODING='utf-8'; | |
exports = module.exports = AmqpRpc; | |
function AmqpRpc(connection){ | |
var self = this; | |
this.connection = connection; | |
this.requests = {}; //hash to store request in wait for response | |
this.response_queue = false; //plaseholder for the future queue | |
} | |
AmqpRpc.prototype.makeRequest = function(queue_name, content, callback){ | |
var self = this; | |
//generate a unique correlation id for this call | |
var correlationId = crypto.randomBytes(16).toString('hex'); | |
//create a timeout for what should happen if we don't get a response | |
var tId = setTimeout(function(corr_id){ | |
//if this ever gets called we didn't get a response in a | |
//timely fashion | |
callback(new Error("timeout " + corr_id)); | |
//delete the entry from hash | |
delete self.requests[corr_id]; | |
}, TIMEOUT, correlationId); | |
//create a request entry to store in a hash | |
var entry = { | |
callback:callback, | |
timeout: tId //the id for the timeout so we can clear it | |
}; | |
//put the entry in the hash so we can match the response later | |
self.requests[correlationId]=entry; | |
//make sure we have a response queue | |
self.setupResponseQueue(function(){ | |
//put the request on a queue | |
self.connection.publish(queue_name, content, { | |
correlationId:correlationId, | |
contentType:CONTENT_TYPE, | |
contentEncoding:CONTENT_ENCODING, | |
replyTo:self.response_queue}); | |
}); | |
} | |
AmqpRpc.prototype.setupResponseQueue = function(next){ | |
//don't mess around if we have a queue | |
if(this.response_queue) return next(); | |
var self = this; | |
//create the queue | |
self.connection.queue('', {exclusive:true}, function(q){ | |
//store the name | |
self.response_queue = q.name; | |
//subscribe to messages | |
q.subscribe(function(message, headers, deliveryInfo, m){ | |
//get the correlationId | |
var correlationId = m.correlationId; | |
//is it a response to a pending request | |
if(correlationId in self.requests){ | |
//retreive the request entry | |
var entry = self.requests[correlationId]; | |
//make sure we don't timeout by clearing it | |
clearTimeout(entry.timeout); | |
//delete the entry from hash | |
delete self.requests[correlationId]; | |
//callback, no err | |
entry.callback(null, message); | |
} | |
}); | |
return next(); | |
}); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//exmaple on how to use amqprpc | |
var amqp = require('amqp'); | |
var connection = amqp.createConnection({host:'127.0.0.1'}); | |
var rpc = new (require('./amqprpc'))(connection); | |
connection.on("ready", function(){ | |
console.log("ready"); | |
var outstanding=0; //counter of outstanding requests | |
//do a number of requests | |
for(var i=1; i<=10 ;i+=1){ | |
//we are about to make a request, increase counter | |
outstanding += 1; | |
rpc.makeRequest('msg_queue', {foo:'bar', index:outstanding}, function response(err, response){ | |
if(err) | |
console.error(err); | |
else | |
console.log("response", response); | |
//reduce for each timeout or response | |
outstanding-=1; | |
isAllDone(); | |
}); | |
} | |
function isAllDone() { | |
//if no more outstanding then close connection | |
if(outstanding === 0){ | |
connection.end(); | |
} | |
} | |
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//super simple rpc server example | |
var amqp = require('amqp') | |
, util = require('util'); | |
var cnn = amqp.createConnection({host:'127.0.0.1'}); | |
cnn.on('ready', function(){ | |
console.log("listening on msg_queue"); | |
cnn.queue('msg_queue', function(q){ | |
q.subscribe(function(message, headers, deliveryInfo, m){ | |
util.log(util.format( deliveryInfo.routingKey, message)); | |
//return index sent | |
cnn.publish(m.replyTo, {response:"OK", index:message.index}, { | |
contentType:'application/json', | |
contentEncoding:'utf-8', | |
correlationId:m.correlationId | |
}); | |
}); | |
}); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I originally made this code as an answer to a question on stackoverflow.
http://stackoverflow.com/questions/10524613/how-to-create-rep-req-on-rabbit-js/