Skip to content

Instantly share code, notes, and snippets.

@kmpm
Created May 17, 2012 18:45

Revisions

  1. Peter Magnusson revised this gist Sep 18, 2012. 2 changed files with 25 additions and 9 deletions.
    31 changes: 23 additions & 8 deletions client.js
    Original file line number Diff line number Diff line change
    @@ -6,13 +6,28 @@ var rpc = new (require('./amqprpc'))(connection);

    connection.on("ready", function(){
    console.log("ready");

    rpc.makeRequest('msg_queue', {foo:'bar'}, function response(err, response){
    if(err)
    console.error(err);
    else
    console.log("response", response);
    connection.end();
    });
    var outstanding=0; //counter of outstanding requests

    //do a number of requests
    for(var i=1; i<=10 ;i+=1){
    //we are about to make a request, increase counter
    outstanding += 1;
    rpc.makeRequest('msg_queue', {foo:'bar', index:outstanding}, function response(err, response){
    if(err)
    console.error(err);
    else
    console.log("response", response);
    //reduce for each timeout or response
    outstanding-=1;
    isAllDone();
    });
    }

    function isAllDone() {
    //if no more outstanding then close connection
    if(outstanding === 0){
    connection.end();
    }
    }

    });
    3 changes: 2 additions & 1 deletion server.js
    Original file line number Diff line number Diff line change
    @@ -9,7 +9,8 @@ cnn.on('ready', function(){
    cnn.queue('msg_queue', function(q){
    q.subscribe(function(message, headers, deliveryInfo, m){
    util.log(util.format( deliveryInfo.routingKey, message));
    cnn.publish(m.replyTo, {response:"OK"}, {
    //return index sent
    cnn.publish(m.replyTo, {response:"OK", index:message.index}, {
    contentType:'application/json',
    contentEncoding:'utf-8',
    correlationId:m.correlationId
  2. Peter Magnusson revised this gist May 17, 2012. 2 changed files with 37 additions and 0 deletions.
    18 changes: 18 additions & 0 deletions client.js
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,18 @@
    //exmaple on how to use amqprpc
    var amqp = require('amqp');
    var connection = amqp.createConnection({host:'127.0.0.1'});

    var rpc = new (require('./amqprpc'))(connection);

    connection.on("ready", function(){
    console.log("ready");

    rpc.makeRequest('msg_queue', {foo:'bar'}, function response(err, response){
    if(err)
    console.error(err);
    else
    console.log("response", response);
    connection.end();
    });

    });
    19 changes: 19 additions & 0 deletions server.js
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,19 @@
    //super simple rpc server example
    var amqp = require('amqp')
    , util = require('util');

    var cnn = amqp.createConnection({host:'127.0.0.1'});

    cnn.on('ready', function(){
    console.log("listening on msg_queue");
    cnn.queue('msg_queue', function(q){
    q.subscribe(function(message, headers, deliveryInfo, m){
    util.log(util.format( deliveryInfo.routingKey, message));
    cnn.publish(m.replyTo, {response:"OK"}, {
    contentType:'application/json',
    contentEncoding:'utf-8',
    correlationId:m.correlationId
    });
    });
    });
    });
  3. Peter Magnusson revised this gist May 17, 2012. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion amqprpc.js
    Original file line number Diff line number Diff line change
    @@ -9,7 +9,7 @@ exports = module.exports = AmqpRpc;

    function AmqpRpc(connection){
    var self = this;
    this.connection = connection; //= typeof(connection) != 'undefined' ? connection : amqp.createConnection();
    this.connection = connection;
    this.requests = {}; //hash to store request in wait for response
    this.response_queue = false; //plaseholder for the future queue
    }
  4. Peter Magnusson created this gist May 17, 2012.
    79 changes: 79 additions & 0 deletions amqprpc.js
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,79 @@
    var amqp = require('amqp')
    , crypto = require('crypto')

    var TIMEOUT=2000; //time to wait for response in ms
    var CONTENT_TYPE='application/json';
    var CONTENT_ENCODING='utf-8';

    exports = module.exports = AmqpRpc;

    function AmqpRpc(connection){
    var self = this;
    this.connection = connection; //= typeof(connection) != 'undefined' ? connection : amqp.createConnection();
    this.requests = {}; //hash to store request in wait for response
    this.response_queue = false; //plaseholder for the future queue
    }

    AmqpRpc.prototype.makeRequest = function(queue_name, content, callback){
    var self = this;
    //generate a unique correlation id for this call
    var correlationId = crypto.randomBytes(16).toString('hex');

    //create a timeout for what should happen if we don't get a response
    var tId = setTimeout(function(corr_id){
    //if this ever gets called we didn't get a response in a
    //timely fashion
    callback(new Error("timeout " + corr_id));
    //delete the entry from hash
    delete self.requests[corr_id];
    }, TIMEOUT, correlationId);

    //create a request entry to store in a hash
    var entry = {
    callback:callback,
    timeout: tId //the id for the timeout so we can clear it
    };

    //put the entry in the hash so we can match the response later
    self.requests[correlationId]=entry;

    //make sure we have a response queue
    self.setupResponseQueue(function(){
    //put the request on a queue
    self.connection.publish(queue_name, content, {
    correlationId:correlationId,
    contentType:CONTENT_TYPE,
    contentEncoding:CONTENT_ENCODING,
    replyTo:self.response_queue});
    });
    }


    AmqpRpc.prototype.setupResponseQueue = function(next){
    //don't mess around if we have a queue
    if(this.response_queue) return next();

    var self = this;
    //create the queue
    self.connection.queue('', {exclusive:true}, function(q){
    //store the name
    self.response_queue = q.name;
    //subscribe to messages
    q.subscribe(function(message, headers, deliveryInfo, m){
    //get the correlationId
    var correlationId = m.correlationId;
    //is it a response to a pending request
    if(correlationId in self.requests){
    //retreive the request entry
    var entry = self.requests[correlationId];
    //make sure we don't timeout by clearing it
    clearTimeout(entry.timeout);
    //delete the entry from hash
    delete self.requests[correlationId];
    //callback, no err
    entry.callback(null, message);
    }
    });
    return next();
    });
    }