Skip to content

Instantly share code, notes, and snippets.

@mahi424
Created February 17, 2022 09:03
Show Gist options
  • Save mahi424/e06c603accefe1242ffa6fadaf1a7c87 to your computer and use it in GitHub Desktop.
Save mahi424/e06c603accefe1242ffa6fadaf1a7c87 to your computer and use it in GitHub Desktop.
const fs = require('fs');
// Load the AWS SDK for Node.js
const AWS = require('aws-sdk');
// Please export credentials in shell
/*
export AWS_ACCESS_KEY_ID="<AWS_ACCESS_KEY_ID>"
export AWS_SECRET_ACCESS_KEY="<AWS_SECRET_ACCESS_KEY>"
export AWS_SESSION_TOKEN="<AWS_SESSION_TOKEN>"
*/
// Set the region we will be using
AWS.config.update({
region: 'us-east-1' || process.env.AWS_REGION,
// secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
// accessKeyId: process.env.AWS_ACCESS_KEY_ID,
});
// Create SQS service client
AWS.config.getCredentials((err) => {
if (err) console.error(err.stack);
// credentials not loaded
else {
console.log('Access key:', AWS.config.credentials);
}
});
class SqsUtils {
constructor({ queueUrl }) {
// aws.config.update(conf);
if (!queueUrl) {
throw new Error('queueUrl is required');
}
this.sqs = new AWS.SQS({ apiVersion: '2012-11-05' });
this.queueUrl = queueUrl;
}
/* static getSQSUrlFromArn(arn) {
const arnParts = arn.split(':');
const [, , serviceName, region, queueOwnerAWSAccountId, queueName] =
arnParts;
return `https://sqs.${region}.amazonaws.com/${queueOwnerAWSAccountId}/${queueName}`;
} */
/* sendMessage({
message,
queueUrl = this.queueUrl,
delaySeconds = 0,
messageGroupId,
messageDeduplicationId,
}) {
const params = {
MessageBody: message,
QueueUrl: queueUrl,
DelaySeconds: delaySeconds,
MessageGroupId: messageGroupId,
MessageDeduplicationId: messageDeduplicationId,
};
// console.log(params);
return this.sqs.sendMessage(params).promise();
} */
receiveMessage({
QueueUrl = this.queueUrl,
VisibilityTimeout = 0, // wait time for anyone else to process.
MaxNumberOfMessages = 1,
}) {
const params = {
QueueUrl,
VisibilityTimeout,
MaxNumberOfMessages,
};
return this.sqs.receiveMessage(params).promise();
}
async getMessageCount({
QueueUrl = this.queueUrl,
AttributeNames = ['ApproximateNumberOfMessages'],
}) {
const response = await this.sqs
.getQueueAttributes({ QueueUrl, AttributeNames })
.promise();
return +response.Attributes.ApproximateNumberOfMessages;
}
deleteMessage({ ReceiptHandle, QueueUrl = this.queueUrl }) {
return this.sqs.deleteMessage({ ReceiptHandle, QueueUrl }).promise();
}
}
async function fetchAllMessages(queueUrl) {
const sqs = new SqsUtils({ queueUrl: queueUrl });
const size = await sqs.getMessageCount({});
console.log(size);
let messages = [];
const attempts = Math.ceil(size / 10);
for (let i = 0; i < attempts; i++) {
const response = await sqs.receiveMessage({
VisibilityTimeout: 600,
MaxNumberOfMessages: 10,
});
console.log(i, response);
console.log('messages', JSON.stringify(response.Messages));
if (response && !response.Messages) {
console.log('breaking');
break;
} else {
messages.push(...response.Messages);
const fileName = `${queueUrl.split('/').pop()}.json`;
await writeDataToFile({ data: messages, filename: fileName });
}
console.log(messages.length);
}
return messages;
}
function writeDataToFile({ filename = 'input.json', data = {} }) {
return new Promise((resolve, reject) => {
fs.appendFile(filename, JSON.stringify(data), function (err) {
if (err) return reject(err);
return resolve();
});
});
}
const queueUrl = 'SQS_QUEUE_URL';
fetchAllMessages(queueUrl)
.then(() => {
console.log('done');
})
.catch((e) => console.error(e));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment