Skip to content

Instantly share code, notes, and snippets.

@TechnotronicOz
Created December 17, 2019 18:49
Show Gist options
  • Save TechnotronicOz/f9b56c8bd49678fb5db925fdabc2b498 to your computer and use it in GitHub Desktop.
Save TechnotronicOz/f9b56c8bd49678fb5db925fdabc2b498 to your computer and use it in GitHub Desktop.
amqp.service.ts
import { Injectable, Logger } from '@nestjs/common';
import * as amqp from 'amqplib';
import { ConfigService } from '../config/config.service';
export interface AmqpChannel {
ack(message: AmqpMessage);
assertQueue(queueName: string);
consume(queueName: string, cb: any);
sendToQueue(queueName: string, message: any);
}
export interface AmqpConnection {
createChannel();
}
export interface AmqpMessage {
content: string;
}
@Injectable()
export class AmqpService {
private readonly logger: Logger = new Logger(AmqpService.name);
private amqpConnection: AmqpConnection;
private readonly channelMap: Map<string, any> = new Map();
constructor(
private readonly configService: ConfigService,
) {
if (!this.amqpConnection) {
this.logger.log('connection to amqp...');
amqp
.connect(this.configService.amqpConfigConnectionString())
.then(connection => {
this.logger.log('connected!');
this.amqpConnection = connection;
});
}
}
async getOrCreateChannel(channelName: string): Promise<AmqpChannel> {
if (!this.channelMap.has(channelName)) {
this.logger.log(`getOrCreateChannel channelMap does not have channelName [${channelName}]`);
const ch: AmqpChannel = await this.amqpConnection.createChannel();
this.channelMap.set(channelName, ch);
return Promise.resolve(ch);
}
this.logger.debug(`getOrCreateChannel exists in cache [channelName=${channelName}]`);
return Promise.resolve(this.channelMap.get(channelName));
}
async ackMessage(channelName: string, message: AmqpMessage) {
const ch = await this.getOrCreateChannel(channelName);
this.logger.debug(`ackMessage [channelName=${channelName}, message=${message.content.toString()}]`);
return ch.ack(message);
}
async consume(channelName: string, queueName: string, cbFn) {
this.logger.log(`consume [channelName=${channelName}, queue=${queueName}]`);
const ch = await this.getOrCreateChannel(channelName);
await ch.assertQueue(queueName);
await ch.consume(queueName, cbFn);
}
async publish(channelName: string, queueName: string, message: string) {
this.logger.log(`publish [channelName=${channelName}, queueName=${queueName}]`);
const ch = await this.getOrCreateChannel(channelName);
ch.assertQueue(queueName);
ch.sendToQueue(queueName, Buffer.from(message));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment