Skip to content

Instantly share code, notes, and snippets.

@andersonbosa
Created June 12, 2024 13:37
Show Gist options
  • Save andersonbosa/215dc9788842c07950d6108d02b63a14 to your computer and use it in GitHub Desktop.
Save andersonbosa/215dc9788842c07950d6108d02b63a14 to your computer and use it in GitHub Desktop.
// ------------------------------- lib
import amqp from 'amqplib'
import { v4 as uuidv4 } from 'uuid'
import {
Broker,
BrokerBinding,
BrokerConfig,
BrokerExchange,
BrokerQueue,
BrokerResources
} from '../@types'
/* The connection is created and maintained until it is closed. */
export class RabbitMQStatefulBroker implements Broker {
public _connection: amqp.Connection | null = null
public _channel: amqp.Channel | null = null
public consumersMap = new Map<string, any>();
constructor(
public config: BrokerConfig
) { }
async close (): Promise<void> {
this.config?.logger?.info(`[${__filename}:#close] closing connection...`)
this._connection?.close()
}
async reconnect (): Promise<void> {
const reconnectTime = this.config.reconnectTime ?? 5000
this.config?.logger?.error(`[${__filename}#connect] trying to reconnect in ${reconnectTime}`)
this._connection = null
this.close()
.then(() => setTimeout(this.connect, reconnectTime))
}
async connect (): Promise<void> {
if (this._connection) {
this.config?.logger?.info(`[${__filename}:#connect] looks like broker is already connected, skip`)
return
}
try {
this._connection = await amqp.connect(this.config.connection.url)
this._channel = await this._connection.createChannel()
this._connection.on('error', (err: { message: unknown }) => {
this._connection = null
this.config?.logger?.error(`[${__filename}#onError] connection error.`, err)
})
this._connection.on('close', () => {
this.config?.logger?.warn(`[${__filename}#onClose] connection closed.`)
// this.reconnect()
})
this.config?.logger?.info(`[${__filename}#connect] successfully connected!`)
}
catch (err) {
this.config?.logger?.error(`[${__filename}#connect] error while connecting:`, err)
this.reconnect()
}
}
async destroyResources (): Promise<void> {
if (!this._connection) await this.connect()
if (!this._channel) throw new Error('Channel not available.')
this.config.resources?.exchanges.map(
async (exchange: BrokerExchange) => await this._channel?.deleteExchange(exchange.name)
.then((deletedResource) => {
this.config?.logger?.info(`[${__filename}#destroyResources] deleted exchange = `, deletedResource)
})
)
this.config.resources?.queues.map(
async (queue: BrokerQueue) => await this._channel?.deleteQueue(queue.name)
.then((deletedResource) => {
this.config?.logger?.info(`[${__filename}#destroyResources] deleted queue = `, deletedResource)
})
)
}
/* It must iterate on the configuration by creating the exchanges, queues and binginds */
async createResources (resources?: BrokerResources | void): Promise<void> {
if (!this._connection) await this.connect()
if (!this._channel) throw new Error('Channel not available.')
try {
const resourcesToCreate = resources ? resources : this.config.resources
if (!resourcesToCreate) throw new Error('Resources was not specified.')
await Promise.all(
resourcesToCreate.exchanges.map(
(exchange: BrokerExchange) => {
this._channel?.assertExchange(
exchange.name,
exchange.type,
exchange.options as amqp.Options.AssertExchange
)
.then(
(createdExchange: amqp.Replies.AssertExchange) => {
this.config?.logger?.info(`[${__filename}#createResources] created exchange = `, createdExchange)
}
)
}
)
)
Promise.all(
resourcesToCreate.queues.map(
async (queue: BrokerQueue) => {
await this._channel?.assertQueue(
queue.name,
queue.options as amqp.Options.AssertQueue
)
.then(
(createdQueue: amqp.Replies.AssertQueue) => {
this.config?.logger?.info(`[${__filename}#createResources] created queue = `, createdQueue)
}
)
}
)
)
resourcesToCreate.binding.map(
async (binding: BrokerBinding) => {
await this._channel?.bindQueue(
binding.targetQueue,
binding.exchange,
binding.routingKeys,
)
.then(
(e: amqp.Replies.Empty) => {
this.config?.logger?.info(`[${__filename}#createResources] created brinding = `, binding
)
}
)
}
)
} catch (error) {
this.config?.logger?.error(`[${__filename}#createResources]:`, error)
}
}
async addConsumer (
targetQueue: string,
handler: (msg: amqp.ConsumeMessage | null) => Promise<void>,
options?: amqp.Options.Consume
): Promise<void> {
if (!this._connection) await this.connect()
if (!this._channel) throw new Error('Channel not available.')
const defaultOptions: amqp.Options.Consume = {
noAck: false, // true = auto ack, false = manual and should be implemented
}
const consumer = await this._channel?.consume(
targetQueue,
handler.bind(this._channel),
Object.assign(defaultOptions, options)
)
this.consumersMap.set(
targetQueue,
{ handler, consumer, channel: this._channel }
)
}
async sendToExchange (
exchangeName: string,
routingKey: string,
message: any,
options?: amqp.Options.Publish
): Promise<boolean> {
if (!this._connection) {
await this.connect()
}
const defaultOptions: amqp.Options.Publish = {
timestamp: Date.now(),
contentEncoding: 'utf-8',
contentType: 'application/json',
persistent: false,
headers: {
messageId: uuidv4(),
source: `${exchangeName}:${routingKey}`,
},
}
try {
this.config?.logger?.info(`[${__filename}] publishing message to exchange.`)
return this._channel?.publish(
exchangeName,
routingKey,
Buffer.from(JSON.stringify(message)),
Object.assign(defaultOptions, options)
) ?? false
} catch (error) {
this.config.logger?.error(`[${__filename}]Error sending message to queue:`, error)
return false
} finally {
setTimeout(this.close, 5000)
}
}
async sendToQueue (queueName: string, message: any, options?: amqp.Options.Publish): Promise<boolean> {
if (!this._connection) await this.connect()
if (!this._channel) throw new Error('Channel not available.')
const defaultOptions: amqp.Options.Publish = {
timestamp: Date.now(),
contentEncoding: 'utf-8',
contentType: 'application/json',
persistent: false,
headers: {
messageId: uuidv4(),
source: `${queueName}`,
},
}
try {
this.config?.logger?.info(`[${__filename}] sending message to queue.`)
return this._channel.sendToQueue(
queueName,
Buffer.from(JSON.stringify(message)),
Object.assign(defaultOptions, options)
)
} catch (error) {
this.config.logger?.error(`[${__filename}] Error sending message to queue:`, error)
return false
} finally {
setTimeout(() => this.close(), 3000)
}
}
}
// -------------------------------- common
import createLogger from 'logging'
import { BrokerConfig } from '../../@types'
export const brokerConfiguration: BrokerConfig = {
logger: createLogger('logger'),
reconnectTime: 3000,
connection: {
url: 'amqp://admin:admin@localhost:5672'
},
resources: {
exchanges: [
{
type: 'topic',
name: 'work.tasks.exchange',
options: {}
},
],
queues: [
{
name: 'work.exporter.queue',
options: { durable: true, }
},
{
name: 'work.importer.queue',
options: { durable: true },
},
{
name: 'work.notification.queue',
options: { durable: true },
},
],
binding: [
{
exchange: 'work.tasks.exchange',
targetQueue: 'work.exporter.queue',
routingKeys: 'task.export'
},
{
exchange: 'work.tasks.exchange',
targetQueue: 'work.importer.queue',
routingKeys: 'task.import'
},
{
exchange: 'work.tasks.exchange',
targetQueue: 'work.notification.queue',
routingKeys: 'notification.#'
}
],
}
}
// ------------------- usage
import createLogger from 'logging'
import { RabbitMQStatefulBroker } from '../broker_stateful'
import { brokerConfiguration } from './0_common'
const logger = createLogger('example-1')
const exampleConfiguration = {
...brokerConfiguration,
logger
}
async function SetupResources (): Promise<void> {
const b0 = new RabbitMQStatefulBroker(exampleConfiguration)
return new Promise<void>(
async (resolve, reject) => {
await b0.connect()
.then(
() => b0.destroyResources().then(
() => b0.createResources().then(
() => setTimeout(
() => {
logger.info('#SetupResources: continuando...')
b0.close()
resolve()
},
3000
)
)
)
)
}
)
}
async function RunProducers () {
const b = new RabbitMQStatefulBroker(exampleConfiguration)
setInterval(
() => {
for (let index = 0; index < 2; index++) {
b.sendToQueue(
'work.exporter.queue',
index
)
b.sendToExchange(
'work.tasks.exchange',
'task.import',
index
)
b.sendToExchange(
'work.tasks.exchange',
`notification.index-${index}`,
index
)
}
},
300
)
}
async function RunConsumerAutoAck () {
const b = new RabbitMQStatefulBroker(exampleConfiguration)
const QUEUE = 'work.exporter.queue'
b.addConsumer(
QUEUE,
async (msg) => {
logger.info('RunConsumerAutoAck: ', msg?.fields.deliveryTag, msg?.properties.headers?.source, msg?.content.toString())
},
{ noAck: true }
)
}
async function RunConsumerManualAck () {
const b = new RabbitMQStatefulBroker(exampleConfiguration)
const QUEUE = 'work.importer.queue'
b.addConsumer(
QUEUE,
async (msg) => {
logger.info('RunConsumerManualAck: ', msg?.fields.deliveryTag, msg?.properties.headers?.source, msg?.content.toString())
b.consumersMap.get(QUEUE)?.channel?.ack(msg)
},
)
}
async function RunConsumerNoAck () {
const b = new RabbitMQStatefulBroker(exampleConfiguration)
const QUEUE = 'work.notification.queue'
b.addConsumer(
QUEUE,
async (msg) => {
logger.info('RunConsumerNoAck: ', msg?.fields.deliveryTag, msg?.properties.headers?.source, msg?.content.toString())
},
)
}
async function example1 () {
await SetupResources()
RunProducers()
RunConsumerAutoAck()
RunConsumerManualAck()
RunConsumerNoAck()
}
example1()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment