Skip to content

Instantly share code, notes, and snippets.

@wajdijurry
Created May 14, 2021 12:21
Show Gist options
  • Save wajdijurry/d05383809a70cd93ed403f3f440f917d to your computer and use it in GitHub Desktop.
Save wajdijurry/d05383809a70cd93ed403f3f440f917d to your computer and use it in GitHub Desktop.
Connect to RabbitMQ in NodeJS using Promise (RPC model)
var amqp = require('amqplib');
var config = require('../config/config');
function generateUuid() {
return Math.random().toString() +
Math.random().toString() +
Math.random().toString();
}
const connect = (url) => {
return new Promise((resolve, reject) => {
amqp.connect(url)
.then(conn => resolve(conn))
.catch(err => reject(err))
})
}
const createChannel = conn => {
return new Promise((resolve, reject) => {
conn.createChannel()
.then(channel => resolve(channel))
.catch(err => reject(err))
})
}
const channelAssertQueue = (channel) => {
return new Promise((resolve, reject) => {
channel.assertQueue('', {
exclusive: true,
autoDelete: true,
})
.then(asserted => resolve(asserted))
.catch(err => reject(err))
})
}
const sendToQueue = (channel, queueName, assertedQueue, message) => {
return new Promise((resolve, reject) => {
let correlationId = generateUuid();
channel.sendToQueue(queueName, Buffer.from(JSON.stringify(message)), {
correlationId: correlationId,
replyTo: assertedQueue
})
.then(resolve(correlationId))
.catch(err => reject(err))
})
}
const consumeQueue = (channel, assertedQueue) => {
return new Promise(resolve => {
channel.consume(assertedQueue, (msg) => {
resolve(msg)
})
})
}
const receiveResponse = (correlationId, response) => {
var result, error;
if (response.properties.correlationId == correlationId) {
console.log(' [.] Got %s', response.content.toString());
response = response.content.toString();
return result = JSON.parse(response);
}
}
const closeConnection = (connection) => {
setTimeout(function() {
connection.close();
}, 500);
}
module.exports = {
send_sync: async (queueName, message) => {
var conn = await connect(config.rabbitmq.host)
var channel = await createChannel(conn)
var assertedQueue = await channelAssertQueue(channel, queueName)
let correlationId = await sendToQueue(channel, queueName, assertedQueue.queue, message)
let response = await consumeQueue(channel, assertedQueue.queue, correlationId)
await closeConnection(conn)
return receiveResponse(correlationId, response)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment