Skip to content

Instantly share code, notes, and snippets.

@davideicardi
Last active January 26, 2016 08:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save davideicardi/240fb44f5803a960864b to your computer and use it in GitHub Desktop.
Save davideicardi/240fb44f5803a960864b to your computer and use it in GitHub Desktop.
RabbitMq and Azure Service Bus Node.js class wrappers (RabbitMqChannel, AzureSubscription)
"use strict";
const debug = require("debug")("azureServiceBus");
const events = require("events");
const azure = require("azure");
/*
Class wrapper around Azure Service Bus Topic/Subscription API.
Usage:
let bus = require("./azureServiceBus.js");
let subscription = new bus.AzureSubscription(SERVICEBUS_CONNECTION, TOPICNAME, SUBSCRIPTIONNAME);
subscription.onMessage(YOURMSGLABEL, (msg) => {
// Your code to handle the message
});
subscription.createIfNotExists({}, (error) =>{
if (error) return console.log(error);
subscription.startReceiving();
});
*/
class AzureSubscription {
constructor(azureBusUrl, topic, subscription) {
const retryOperations = new azure.ExponentialRetryPolicyFilter();
this.waitOnceListeners = new Set();
this.receiving = false;
this.topic = topic;
this.subscription = subscription;
this.eventEmitter = new events.EventEmitter();
this.serviceBusService = azure
.createServiceBusService(azureBusUrl)
.withFilter(retryOperations);
}
receiveMessage(callback){
this.serviceBusService
.receiveSubscriptionMessage(this.topic, this.subscription, (error, receivedMessage) => {
if(error) return callback(null, null);
debug(receivedMessage);
// Parse body
// try to read the body (and check if is serialized with .NET, int this case remove extra characters)
let matches = receivedMessage.body.match(/({.*})/);
if (matches || matches.length >= 1) receivedMessage.body = JSON.parse(matches[0]);
return callback(null, receivedMessage);
});
}
createIfNotExists(options, callback){
options = options | {};
debug(`Checking subscription ${this.topic}/${this.subscription} ...`);
this.exists((error, exists) => {
if (error) return callback(error);
if (exists){
debug(`Subscription ${this.topic}/${this.subscription} already exists.`);
return callback(null);
}
this.serviceBusService
.createSubscription(this.topic, this.subscription, options, (error) => {
if (error) return callback(error);
debug(`Subscription ${this.topic}/${this.subscription} created.`);
return callback(null);
});
});
}
exists(callback){
this.serviceBusService
.getSubscription(this.topic, this.subscription, (error, result) => {
if (error) {
if (error.statusCode === 404) return callback(null, false);
return callback(error, false);
}
callback(null, true);
});
}
onMessage(msgLabel, listener){
this.eventEmitter.on(msgLabel, listener);
}
_emit(name, body){
this.eventEmitter.emit(name, body);
let listeners = this.waitOnceListeners;
for(let item of listeners){
if (item.predicate(name, body)){
item.resolve(body);
listeners.delete(item);
}
}
}
_receivingLoop(receiveInterval){
this.receiveMessage((error, msg) => {
if (error) debug(error);
if (!this.receiving) return;
if (!error && msg) {
this._emit(msg.brokerProperties.Label, msg.body);
}
setTimeout(() => this._receivingLoop(receiveInterval), receiveInterval);
});
}
startReceiving(receiveInterval){
receiveInterval = receiveInterval || 500;
if (this.receiving) return;
this.receiving = true;
debug("Start receiving messages...");
this._receivingLoop(receiveInterval);
}
stopReceiving(){
this.receiving = false;
debug("Stop receiving messages.");
}
waitOnce(predicate){
return new Promise((resolve, reject) => {
this.waitOnceListeners.add({predicate: predicate, resolve: resolve, reject: reject});
});
}
}
module.exports.AzureSubscription = AzureSubscription;
"use strict";
const debug = require("debug")("rabbitMqServiceBus");
const amqp = require('amqplib');
// Code based on:
// https://www.rabbitmq.com/tutorials/tutorial-three-javascript.html
// https://github.com/squaremo/amqp.node/blob/master/examples/tutorials/receive_logs.js
class RabbitMqChannel {
constructor(url) {
this.URL = url;
}
connect(){
return amqp.connect(this.URL)
.then((conn) => {
this.connection = conn;
return conn.createChannel();
})
.then((ch) => {
this.channel = ch;
});
}
close(){
this.connection.close();
}
subscribeToExchange(exchange, listener){
return this.channel
//.assertExchange(exchange, "fanout", {durable: false})
//.then(() => {
// return this.channel.assertQueue('', {exclusive: true});
//})
.assertQueue('', {exclusive: true})
.then((qok) => {
return this.channel.bindQueue(qok.queue, exchange, '')
.then(() => {
return qok.queue;
});
})
.then((queue) => {
return this.channel.consume(queue, listener, {noAck: true});
})
.then(() => {
debug(` [*] Waiting for ${exchange}...`);
});
}
}
module.exports.RabbitMqChannel = RabbitMqChannel;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment