Created
June 12, 2024 13:37
-
-
Save andersonbosa/215dc9788842c07950d6108d02b63a14 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// ------------------------------- lib | |
import amqp from 'amqplib' | |
import { v4 as uuidv4 } from 'uuid' | |
import { | |
Broker, | |
BrokerBinding, | |
BrokerConfig, | |
BrokerExchange, | |
BrokerQueue, | |
BrokerResources | |
} from '../@types' | |
/* The connection is created and maintained until it is closed. */ | |
export class RabbitMQStatefulBroker implements Broker { | |
public _connection: amqp.Connection | null = null | |
public _channel: amqp.Channel | null = null | |
public consumersMap = new Map<string, any>(); | |
constructor( | |
public config: BrokerConfig | |
) { } | |
async close (): Promise<void> { | |
this.config?.logger?.info(`[${__filename}:#close] closing connection...`) | |
this._connection?.close() | |
} | |
async reconnect (): Promise<void> { | |
const reconnectTime = this.config.reconnectTime ?? 5000 | |
this.config?.logger?.error(`[${__filename}#connect] trying to reconnect in ${reconnectTime}`) | |
this._connection = null | |
this.close() | |
.then(() => setTimeout(this.connect, reconnectTime)) | |
} | |
async connect (): Promise<void> { | |
if (this._connection) { | |
this.config?.logger?.info(`[${__filename}:#connect] looks like broker is already connected, skip`) | |
return | |
} | |
try { | |
this._connection = await amqp.connect(this.config.connection.url) | |
this._channel = await this._connection.createChannel() | |
this._connection.on('error', (err: { message: unknown }) => { | |
this._connection = null | |
this.config?.logger?.error(`[${__filename}#onError] connection error.`, err) | |
}) | |
this._connection.on('close', () => { | |
this.config?.logger?.warn(`[${__filename}#onClose] connection closed.`) | |
// this.reconnect() | |
}) | |
this.config?.logger?.info(`[${__filename}#connect] successfully connected!`) | |
} | |
catch (err) { | |
this.config?.logger?.error(`[${__filename}#connect] error while connecting:`, err) | |
this.reconnect() | |
} | |
} | |
async destroyResources (): Promise<void> { | |
if (!this._connection) await this.connect() | |
if (!this._channel) throw new Error('Channel not available.') | |
this.config.resources?.exchanges.map( | |
async (exchange: BrokerExchange) => await this._channel?.deleteExchange(exchange.name) | |
.then((deletedResource) => { | |
this.config?.logger?.info(`[${__filename}#destroyResources] deleted exchange = `, deletedResource) | |
}) | |
) | |
this.config.resources?.queues.map( | |
async (queue: BrokerQueue) => await this._channel?.deleteQueue(queue.name) | |
.then((deletedResource) => { | |
this.config?.logger?.info(`[${__filename}#destroyResources] deleted queue = `, deletedResource) | |
}) | |
) | |
} | |
/* It must iterate on the configuration by creating the exchanges, queues and binginds */ | |
async createResources (resources?: BrokerResources | void): Promise<void> { | |
if (!this._connection) await this.connect() | |
if (!this._channel) throw new Error('Channel not available.') | |
try { | |
const resourcesToCreate = resources ? resources : this.config.resources | |
if (!resourcesToCreate) throw new Error('Resources was not specified.') | |
await Promise.all( | |
resourcesToCreate.exchanges.map( | |
(exchange: BrokerExchange) => { | |
this._channel?.assertExchange( | |
exchange.name, | |
exchange.type, | |
exchange.options as amqp.Options.AssertExchange | |
) | |
.then( | |
(createdExchange: amqp.Replies.AssertExchange) => { | |
this.config?.logger?.info(`[${__filename}#createResources] created exchange = `, createdExchange) | |
} | |
) | |
} | |
) | |
) | |
Promise.all( | |
resourcesToCreate.queues.map( | |
async (queue: BrokerQueue) => { | |
await this._channel?.assertQueue( | |
queue.name, | |
queue.options as amqp.Options.AssertQueue | |
) | |
.then( | |
(createdQueue: amqp.Replies.AssertQueue) => { | |
this.config?.logger?.info(`[${__filename}#createResources] created queue = `, createdQueue) | |
} | |
) | |
} | |
) | |
) | |
resourcesToCreate.binding.map( | |
async (binding: BrokerBinding) => { | |
await this._channel?.bindQueue( | |
binding.targetQueue, | |
binding.exchange, | |
binding.routingKeys, | |
) | |
.then( | |
(e: amqp.Replies.Empty) => { | |
this.config?.logger?.info(`[${__filename}#createResources] created brinding = `, binding | |
) | |
} | |
) | |
} | |
) | |
} catch (error) { | |
this.config?.logger?.error(`[${__filename}#createResources]:`, error) | |
} | |
} | |
async addConsumer ( | |
targetQueue: string, | |
handler: (msg: amqp.ConsumeMessage | null) => Promise<void>, | |
options?: amqp.Options.Consume | |
): Promise<void> { | |
if (!this._connection) await this.connect() | |
if (!this._channel) throw new Error('Channel not available.') | |
const defaultOptions: amqp.Options.Consume = { | |
noAck: false, // true = auto ack, false = manual and should be implemented | |
} | |
const consumer = await this._channel?.consume( | |
targetQueue, | |
handler.bind(this._channel), | |
Object.assign(defaultOptions, options) | |
) | |
this.consumersMap.set( | |
targetQueue, | |
{ handler, consumer, channel: this._channel } | |
) | |
} | |
async sendToExchange ( | |
exchangeName: string, | |
routingKey: string, | |
message: any, | |
options?: amqp.Options.Publish | |
): Promise<boolean> { | |
if (!this._connection) { | |
await this.connect() | |
} | |
const defaultOptions: amqp.Options.Publish = { | |
timestamp: Date.now(), | |
contentEncoding: 'utf-8', | |
contentType: 'application/json', | |
persistent: false, | |
headers: { | |
messageId: uuidv4(), | |
source: `${exchangeName}:${routingKey}`, | |
}, | |
} | |
try { | |
this.config?.logger?.info(`[${__filename}] publishing message to exchange.`) | |
return this._channel?.publish( | |
exchangeName, | |
routingKey, | |
Buffer.from(JSON.stringify(message)), | |
Object.assign(defaultOptions, options) | |
) ?? false | |
} catch (error) { | |
this.config.logger?.error(`[${__filename}]Error sending message to queue:`, error) | |
return false | |
} finally { | |
setTimeout(this.close, 5000) | |
} | |
} | |
async sendToQueue (queueName: string, message: any, options?: amqp.Options.Publish): Promise<boolean> { | |
if (!this._connection) await this.connect() | |
if (!this._channel) throw new Error('Channel not available.') | |
const defaultOptions: amqp.Options.Publish = { | |
timestamp: Date.now(), | |
contentEncoding: 'utf-8', | |
contentType: 'application/json', | |
persistent: false, | |
headers: { | |
messageId: uuidv4(), | |
source: `${queueName}`, | |
}, | |
} | |
try { | |
this.config?.logger?.info(`[${__filename}] sending message to queue.`) | |
return this._channel.sendToQueue( | |
queueName, | |
Buffer.from(JSON.stringify(message)), | |
Object.assign(defaultOptions, options) | |
) | |
} catch (error) { | |
this.config.logger?.error(`[${__filename}] Error sending message to queue:`, error) | |
return false | |
} finally { | |
setTimeout(() => this.close(), 3000) | |
} | |
} | |
} | |
// -------------------------------- common | |
import createLogger from 'logging' | |
import { BrokerConfig } from '../../@types' | |
export const brokerConfiguration: BrokerConfig = { | |
logger: createLogger('logger'), | |
reconnectTime: 3000, | |
connection: { | |
url: 'amqp://admin:admin@localhost:5672' | |
}, | |
resources: { | |
exchanges: [ | |
{ | |
type: 'topic', | |
name: 'work.tasks.exchange', | |
options: {} | |
}, | |
], | |
queues: [ | |
{ | |
name: 'work.exporter.queue', | |
options: { durable: true, } | |
}, | |
{ | |
name: 'work.importer.queue', | |
options: { durable: true }, | |
}, | |
{ | |
name: 'work.notification.queue', | |
options: { durable: true }, | |
}, | |
], | |
binding: [ | |
{ | |
exchange: 'work.tasks.exchange', | |
targetQueue: 'work.exporter.queue', | |
routingKeys: 'task.export' | |
}, | |
{ | |
exchange: 'work.tasks.exchange', | |
targetQueue: 'work.importer.queue', | |
routingKeys: 'task.import' | |
}, | |
{ | |
exchange: 'work.tasks.exchange', | |
targetQueue: 'work.notification.queue', | |
routingKeys: 'notification.#' | |
} | |
], | |
} | |
} | |
// ------------------- usage | |
import createLogger from 'logging' | |
import { RabbitMQStatefulBroker } from '../broker_stateful' | |
import { brokerConfiguration } from './0_common' | |
const logger = createLogger('example-1') | |
const exampleConfiguration = { | |
...brokerConfiguration, | |
logger | |
} | |
async function SetupResources (): Promise<void> { | |
const b0 = new RabbitMQStatefulBroker(exampleConfiguration) | |
return new Promise<void>( | |
async (resolve, reject) => { | |
await b0.connect() | |
.then( | |
() => b0.destroyResources().then( | |
() => b0.createResources().then( | |
() => setTimeout( | |
() => { | |
logger.info('#SetupResources: continuando...') | |
b0.close() | |
resolve() | |
}, | |
3000 | |
) | |
) | |
) | |
) | |
} | |
) | |
} | |
async function RunProducers () { | |
const b = new RabbitMQStatefulBroker(exampleConfiguration) | |
setInterval( | |
() => { | |
for (let index = 0; index < 2; index++) { | |
b.sendToQueue( | |
'work.exporter.queue', | |
index | |
) | |
b.sendToExchange( | |
'work.tasks.exchange', | |
'task.import', | |
index | |
) | |
b.sendToExchange( | |
'work.tasks.exchange', | |
`notification.index-${index}`, | |
index | |
) | |
} | |
}, | |
300 | |
) | |
} | |
async function RunConsumerAutoAck () { | |
const b = new RabbitMQStatefulBroker(exampleConfiguration) | |
const QUEUE = 'work.exporter.queue' | |
b.addConsumer( | |
QUEUE, | |
async (msg) => { | |
logger.info('RunConsumerAutoAck: ', msg?.fields.deliveryTag, msg?.properties.headers?.source, msg?.content.toString()) | |
}, | |
{ noAck: true } | |
) | |
} | |
async function RunConsumerManualAck () { | |
const b = new RabbitMQStatefulBroker(exampleConfiguration) | |
const QUEUE = 'work.importer.queue' | |
b.addConsumer( | |
QUEUE, | |
async (msg) => { | |
logger.info('RunConsumerManualAck: ', msg?.fields.deliveryTag, msg?.properties.headers?.source, msg?.content.toString()) | |
b.consumersMap.get(QUEUE)?.channel?.ack(msg) | |
}, | |
) | |
} | |
async function RunConsumerNoAck () { | |
const b = new RabbitMQStatefulBroker(exampleConfiguration) | |
const QUEUE = 'work.notification.queue' | |
b.addConsumer( | |
QUEUE, | |
async (msg) => { | |
logger.info('RunConsumerNoAck: ', msg?.fields.deliveryTag, msg?.properties.headers?.source, msg?.content.toString()) | |
}, | |
) | |
} | |
async function example1 () { | |
await SetupResources() | |
RunProducers() | |
RunConsumerAutoAck() | |
RunConsumerManualAck() | |
RunConsumerNoAck() | |
} | |
example1() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment