Skip to content

Instantly share code, notes, and snippets.

@repiatx
Last active May 23, 2022 10:50
Show Gist options
  • Save repiatx/43eb059e22ad81160545d4e7e7434604 to your computer and use it in GitHub Desktop.
Save repiatx/43eb059e22ad81160545d4e7e7434604 to your computer and use it in GitHub Desktop.
RabbitMQService With logger
const pino = require('pino')
const transport = pino.transport({
target: 'pino-mongodb',
level: 'info',
options: {
uri: process.env.MONGO_URI,
collection: process.env.PREFIX + 'logs'
}
})
const streams = [
{stream: transport},
{stream: process.stdout}
]
const hlog = pino(
{
level: 'info'
},
pino.multistream(streams)
)
module.exports = hlog
const amqp = require('amqp-connection-manager')
const hlog = require('./Logger')
const Runner = require('../jobs/RabbitRunner')
class RabbitMQConnection {
constructor() {
this.channelWrapper = null
}
consumeFunction = async (msg) => {
Runner.Start(msg, this.channelWrapper)
}
async connect() {
hlog.warn('RabbitMQ Connnectiing...')
const rabbitConn = await amqp.connect([process.env.RABBITMQ_URI], {
heartbeatIntervalInSeconds: 60
})
rabbitConn.on('connect', () => {
hlog.info('RabbitMQ connected')
})
rabbitConn.on('disconnect', (err) => {
hlog.error(err, 'RabbitMQ Disconnected')
process.exit()
})
this.channelWrapper = rabbitConn.createChannel({
json: true
})
await this.channelWrapper.addSetup((channel) => {
const productQueueName = process.env.PREFIX + process.env.RABBITMQ_PRODUCT_QUEUE
const tokenQueueName = process.env.PREFIX + process.env.RABBITMQ_TOKEN_QUEUE
const concurrentJobCount = 10
return Promise.all([
channel.assertQueue(productQueueName),
channel.assertQueue(tokenQueueName),
channel.prefetch(concurrentJobCount),
channel.consume(productQueueName, this.consumeFunction),
channel.consume(tokenQueueName, this.consumeFunction)
])
})
await this.channelWrapper.waitForConnect()
}
}
module.exports = new RabbitMQConnection()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment