Skip to content

Instantly share code, notes, and snippets.

@witalobenicio
Last active July 30, 2021 09:47
Show Gist options
  • Save witalobenicio/436ca69e3fa666447fc5bb4551c7a361 to your computer and use it in GitHub Desktop.
Save witalobenicio/436ca69e3fa666447fc5bb4551c7a361 to your computer and use it in GitHub Desktop.
import {
ChangeMessageVisibilityCommand,
ChangeMessageVisibilityCommandOutput,
DeleteMessageCommand,
DeleteMessageCommandOutput,
Message,
ReceiveMessageCommand,
SendMessageBatchCommand,
SendMessageBatchRequestEntry,
SendMessageCommand,
SendMessageCommandOutput,
SQSClient,
SQSClientConfig,
} from '@aws-sdk/client-sqs';
import config from '@config';
import { IQueue } from '@managers/queue/types/Queue';
import chunkify from '@core/utils/chunkify';
class QueueManager {
private awsConfig: SQSClientConfig = {
region: config.awsRegion,
credentials: {
accessKeyId: config.awsAccessKey,
secretAccessKey: config.awsAccessSecret,
},
};
private sqs: SQSClient = new SQSClient(this.awsConfig);
// Max number of messages to receive
private readonly batch: number = 10;
// Max number of seconds that message will 'belong' to a receiver
private readonly visibilityTimeout: number = 8 * 60;
public async send(body: string, to?: IQueue): Promise<void> {
if (!body) { throw new Error('You need to provide a body'); }
if (!to) { throw new Error('You need to provide a queue destination'); }
const command = new SendMessageCommand({
QueueUrl: to.getUrl(),
MessageBody: body,
});
return this.sqs.send(command);
}
public async sendMultiple(bodies: string[], to: IQueue): Promise<void> {
if (!bodies || !bodies.length) { throw new Error('You need to provide a body'); }
const requests = [];
// Separate our bigger array into multiple arrays of batch max size so we don't get an error
chunkify(tasks, this.batch).forEach(chunk => {
const entries = chunk.map(body => ({ MessageBody: body })) as unknown as SendMessageBatchRequestEntry[];
const command = new SendMessageBatchCommand({
QueueUrl: to.getUrl(),
Entries: entries,
});
requests.push(this.sqs.send(command));
});
return Promise.all(requests);
}
public async releaseMessage(receiptHandle: string, from: IQueue): Promise<ChangeMessageVisibilityCommandOutput> {
const command = new ChangeMessageVisibilityCommand({
QueueUrl: from.getUrl(),
VisibilityTimeout: 0,
ReceiptHandle: receiptHandle,
});
return this.sqs.send(command);
}
public async receiveWithPriority(from: IQueue, quantity: number): Promise<Message[]> {
const messages = await this.receiveFromQueue(from, quantity, 0);
if ((!messages || !messages.length) && from.getPriorityQueue()) {
return this.receive(from.getPriorityQueue(), quantity);
}
return messages;
}
public async receive(from: IQueue, quantity: number): Promise<Message[]> {
return this.receiveWithPriority(from, quantity);
}
public async delete(receiptHandle: string, from: IQueue): Promise<DeleteMessageCommandOutput> {
const command = new DeleteMessageCommand({
QueueUrl: from.getUrl(),
ReceiptHandle: receiptHandle,
});
return this.sqs.send(command);
}
private async receiveFromQueue(from: IQueue, quantity: number, waitTime: number): Promise<Message[]> {
const usedQuantity = quantity === undefined || quantity > this.batch ? this.batch : quantity;
const command = new ReceiveMessageCommand({
QueueUrl: from.getUrl(),
MaxNumberOfMessages: usedQuantity,
VisibilityTimeout: this.visibilityTimeout,
WaitTimeSeconds: waitTime,
});
const data = await this.sqs.send(command);
return data.Messages;
}
}
export default QueueManager;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment