Last active
August 29, 2015 14:15
amqpreconnection.js
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; | |
/** | |
* dependencies | |
*/ | |
var amqp = require('amqplib'), | |
when = require('when'); | |
/** | |
* Sender Class, initialized with amqp address | |
* @param {string} amqpAddr The amqp address | |
* @param {string} key An argument that represent routing key. | |
* @param {object} msg Message wrapped by protocol buffer. | |
* @constructor | |
*/ | |
var Sender = function( amqpAddr, key, msg ) { | |
/** | |
* amqp address | |
* @type {!string} | |
* @private | |
*/ | |
this.addr_ = amqpAddr || 'amqp://localhost'; | |
/** | |
* amqp routing key | |
* @type {!Object} | |
* @private | |
*/ | |
this.key_ = key; | |
/** | |
* messages wrapped by protocol buffer | |
* @type {!Object} | |
* @private | |
*/ | |
this.msg_ = msg; | |
/** | |
* amqp channel | |
* @type {!Object} | |
* @private | |
*/ | |
this.ch_ = {}; | |
/** | |
* amqp connection | |
* @type {!Object} | |
* @private | |
*/ | |
this.con_ = {}; | |
this.ex_ = 'BS'; | |
}; | |
/** | |
* create channel to amqp. | |
* @param {Object} con Amqp connection | |
*/ | |
Sender.prototype.createChannel_ = function( con ) { | |
this.con_ = con; | |
return this.con_.createConfirmChannel(); | |
}; | |
/** | |
* create exchange for amqp in direct way. | |
* @param {Object} ch Amqp channel | |
*/ | |
Sender.prototype.createExchange_ = function( ch ) { | |
this.ch_ = ch; | |
return this.ch_.assertExchange( this.ex_ , 'direct', { durable: true } ); | |
}; | |
/** | |
* create fake channel for amqp | |
*/ | |
Sender.prototype.createFakeChannel_ = function() { | |
return this.ch_ = { | |
publish_: function() { | |
return null; | |
} | |
}; | |
}; | |
/** | |
* Handle unrouteable message | |
*/ | |
Sender.prototype.handleUnrouteableMessages_ = function() { | |
return this.ch_.on('return', function( msg ) { | |
return console.log( 'Message returned to sender ' + msg.content ); | |
}); | |
}; | |
/** | |
* Handle amqp disconenction and try to reconnect. | |
*/ | |
Sender.prototype.handleDisconnections_ = function() { | |
return this.con_.on('error', (function( that ) { | |
return function(e) { | |
return that.reconnect_( e ); | |
}; | |
})(this)); | |
}; | |
/** | |
* Reconnect to amqp | |
* @param {} err Amqp disconnection error | |
*/ | |
Sender.prototype.reconnect_ = function( err ) { | |
console.error('MessageBus disconnected, attempting to reconnect' + err); | |
this.createFakeChannel_(); | |
return setTimeout( this.deliverMessage.bind(this), 3000); | |
}; | |
/** | |
* Publish message to amqp | |
* | |
*/ | |
Sender.prototype.publish_ = function() { | |
var self = this; | |
this.ch_.publish( this.ex_, this.key_, this.msg_, { deliveryMode: 2, mandatory: true }, function() { | |
console.log('message processed'); | |
return self.ch_.close(); | |
}); | |
}; | |
/** | |
* Try to connect to amqp, handle failure then deliver message to amqp. | |
* @public | |
*/ | |
Sender.prototype.deliverMessage = function() { | |
when(amqp.connect( this.addr_ )) | |
.with( this ) | |
.then( this.createChannel_ ) | |
.then( this.createExchange_ ) | |
.then( this.handleUnrouteableMessages_ ) | |
.then( this.handleDisconnections_ ) | |
.catch( this.reconnect_ ) | |
.done( this.publish_ ); | |
}; | |
/** | |
* exports | |
*/ | |
module.exports = exports = Sender; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment