Skip to content

Instantly share code, notes, and snippets.

Last active August 29, 2015 14:15
'use strict';
* dependencies
var amqp = require('amqplib'),
when = require('when');
* Sender Class, initialized with amqp address
* @param {string} amqpAddr The amqp address
* @param {string} key An argument that represent routing key.
* @param {object} msg Message wrapped by protocol buffer.
* @constructor
var Sender = function( amqpAddr, key, msg ) {
* amqp address
* @type {!string}
* @private
this.addr_ = amqpAddr || 'amqp://localhost';
* amqp routing key
* @type {!Object}
* @private
this.key_ = key;
* messages wrapped by protocol buffer
* @type {!Object}
* @private
this.msg_ = msg;
* amqp channel
* @type {!Object}
* @private
this.ch_ = {};
* amqp connection
* @type {!Object}
* @private
this.con_ = {};
this.ex_ = 'BS';
* create channel to amqp.
* @param {Object} con Amqp connection
Sender.prototype.createChannel_ = function( con ) {
this.con_ = con;
return this.con_.createConfirmChannel();
* create exchange for amqp in direct way.
* @param {Object} ch Amqp channel
Sender.prototype.createExchange_ = function( ch ) {
this.ch_ = ch;
return this.ch_.assertExchange( this.ex_ , 'direct', { durable: true } );
* create fake channel for amqp
Sender.prototype.createFakeChannel_ = function() {
return this.ch_ = {
publish_: function() {
return null;
* Handle unrouteable message
Sender.prototype.handleUnrouteableMessages_ = function() {
return this.ch_.on('return', function( msg ) {
return console.log( 'Message returned to sender ' + msg.content );
* Handle amqp disconenction and try to reconnect.
Sender.prototype.handleDisconnections_ = function() {
return this.con_.on('error', (function( that ) {
return function(e) {
return that.reconnect_( e );
* Reconnect to amqp
* @param {} err Amqp disconnection error
Sender.prototype.reconnect_ = function( err ) {
console.error('MessageBus disconnected, attempting to reconnect' + err);
return setTimeout( this.deliverMessage.bind(this), 3000);
* Publish message to amqp
Sender.prototype.publish_ = function() {
var self = this;
this.ch_.publish( this.ex_, this.key_, this.msg_, { deliveryMode: 2, mandatory: true }, function() {
console.log('message processed');
return self.ch_.close();
* Try to connect to amqp, handle failure then deliver message to amqp.
* @public
Sender.prototype.deliverMessage = function() {
when(amqp.connect( this.addr_ ))
.with( this )
.then( this.createChannel_ )
.then( this.createExchange_ )
.then( this.handleUnrouteableMessages_ )
.then( this.handleDisconnections_ )
.catch( this.reconnect_ )
.done( this.publish_ );
* exports
module.exports = exports = Sender;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment