Skip to content

Instantly share code, notes, and snippets.

@richzw
Last active August 29, 2015 14:15
amqpreconnection.js
'use strict';
/**
* dependencies
*/
var amqp = require('amqplib'),
when = require('when');
/**
* Sender Class, initialized with amqp address
* @param {string} amqpAddr The amqp address
* @param {string} key An argument that represent routing key.
* @param {object} msg Message wrapped by protocol buffer.
* @constructor
*/
var Sender = function( amqpAddr, key, msg ) {
/**
* amqp address
* @type {!string}
* @private
*/
this.addr_ = amqpAddr || 'amqp://localhost';
/**
* amqp routing key
* @type {!Object}
* @private
*/
this.key_ = key;
/**
* messages wrapped by protocol buffer
* @type {!Object}
* @private
*/
this.msg_ = msg;
/**
* amqp channel
* @type {!Object}
* @private
*/
this.ch_ = {};
/**
* amqp connection
* @type {!Object}
* @private
*/
this.con_ = {};
this.ex_ = 'BS';
};
/**
* create channel to amqp.
* @param {Object} con Amqp connection
*/
Sender.prototype.createChannel_ = function( con ) {
this.con_ = con;
return this.con_.createConfirmChannel();
};
/**
* create exchange for amqp in direct way.
* @param {Object} ch Amqp channel
*/
Sender.prototype.createExchange_ = function( ch ) {
this.ch_ = ch;
return this.ch_.assertExchange( this.ex_ , 'direct', { durable: true } );
};
/**
* create fake channel for amqp
*/
Sender.prototype.createFakeChannel_ = function() {
return this.ch_ = {
publish_: function() {
return null;
}
};
};
/**
* Handle unrouteable message
*/
Sender.prototype.handleUnrouteableMessages_ = function() {
return this.ch_.on('return', function( msg ) {
return console.log( 'Message returned to sender ' + msg.content );
});
};
/**
* Handle amqp disconenction and try to reconnect.
*/
Sender.prototype.handleDisconnections_ = function() {
return this.con_.on('error', (function( that ) {
return function(e) {
return that.reconnect_( e );
};
})(this));
};
/**
* Reconnect to amqp
* @param {} err Amqp disconnection error
*/
Sender.prototype.reconnect_ = function( err ) {
console.error('MessageBus disconnected, attempting to reconnect' + err);
this.createFakeChannel_();
return setTimeout( this.deliverMessage.bind(this), 3000);
};
/**
* Publish message to amqp
*
*/
Sender.prototype.publish_ = function() {
var self = this;
this.ch_.publish( this.ex_, this.key_, this.msg_, { deliveryMode: 2, mandatory: true }, function() {
console.log('message processed');
return self.ch_.close();
});
};
/**
* Try to connect to amqp, handle failure then deliver message to amqp.
* @public
*/
Sender.prototype.deliverMessage = function() {
when(amqp.connect( this.addr_ ))
.with( this )
.then( this.createChannel_ )
.then( this.createExchange_ )
.then( this.handleUnrouteableMessages_ )
.then( this.handleDisconnections_ )
.catch( this.reconnect_ )
.done( this.publish_ );
};
/**
* exports
*/
module.exports = exports = Sender;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment