Skip to content

Instantly share code, notes, and snippets.

@jakubturcovsky
Created January 30, 2019 17:11
Show Gist options
  • Save jakubturcovsky/7a8c7d33c5130b756b9c20ff86bf18ae to your computer and use it in GitHub Desktop.
Save jakubturcovsky/7a8c7d33c5130b756b9c20ff86bf18ae to your computer and use it in GitHub Desktop.
Code for reading all messages from SQS queue, sorting lines and counting them
let fs = require('fs');
let aws = require('aws-sdk');
let exec = require('child_process').exec;
let SqsQueueParallel = require('sqs-queue-parallel');
let sqsUri = "URI TO SQS QUEUE"; // TODO add URI
const pollFile = "sqspoll.txt";
const sortFile = "sort.txt";
removeOldFiles();
var i = 0;
let pollFileStream = fs.createWriteStream(pollFile, {'flags': 'a'});
function removeOldFiles() {
try {
fs.unlinkSync(pollFile);
fs.unlinkSync(sortFile);
} catch (err) {}
}
// TODO call parallelPoll() or serialPoll()
/**
* Multi threaded polling.
* Reads indefinitely, you have to check if everything is read and deleted and then count messages manually.
*/
function parallelPoll() {
let queue = new SqsQueueParallel({
accessKeyId: "KEY", // TODO add access key
secretAccessKey: "KEY", // TODO add secret access key
region: "REGION", // TODO add region
name: "QUEUE", // TODO add name of the queue
maxNumberOfMessages: 4, // Max number of messages in one request (max 10)
concurrency: 8, // Number of concurrent threads
VisibilityTimeout: 20, // Hide read messages for n seconds
WaitTimeSeconds: 0
});
queue.on('message', function (e) {
i++;
console.log("i = %d", i);
e.data.forEach(function (message) {
appendMessage(JSON.stringify(message));
});
e.deleteMessage(function(err, data) {
e.next();
});
});
queue.on('error', function (err) {
console.log('There was an error: ', err);
});
}
/**
* Single threaded polling.
* If there aren't any other messages to read, reading ends and prints count of unique messages.
* Create config.json file with one object containing accessKeyId, secretAccessKey and region
*/
let params;
let sqs;
function serialPoll() {
aws.config.loadFromPath('./config.json');
sqs = new aws.SQS({apiVersion: '2012-11-05'});
params = {
AttributeNames: [
"SentTimestamp"
],
MaxNumberOfMessages: 10, // Max number of messages in one request (max 10)
MessageAttributeNames: [
"All"
],
QueueUrl: sqsUri,
VisibilityTimeout: 20, // Hide read messages for n seconds
WaitTimeSeconds: 0
};
pollMessage();
}
function pollMessage() {
sqs.receiveMessage(params, function (err, data) {
if (err) {
console.log("Receive Error", err);
pollMessage()
} else if (data.Messages) {
i++;
data.Messages.forEach(function (message) {
parseMessages(message.Body);
});
// Uncomment for deleting messages
// var deleteParams = {
// QueueUrl: sqsUri,
// ReceiptHandle: data.Messages[0].ReceiptHandle
// };
// sqs.deleteMessage(deleteParams, function(err, data) {
// if (err) {
// console.log("Delete Error", err);
// } else {
// console.log("Message Deleted", data);
// }
// });
pollMessage();
} else {
console.log("Finished, i = %d", i);
finalProcessing();
return;
}
console.log("i = %d", i);
});
}
function parseMessages(message) {
var json = JSON.parse(message);
json.forEach(function (object) {
appendMessage(JSON.stringify(object));
});
}
function appendMessage(message) {
appendToStream(message + "\n");
}
function appendToStream(message) {
pollFileStream.write(message);
}
/**
* Sorting and deleting non-unique messages, counting lines.
*/
function finalProcessing() {
pollFileStream.end();
sort(pollFile, sortFile);
countLines(pollFile);
}
function sort(fileName, resultFileName) {
exec("sort " + pollFile + " | uniq > " + resultFileName, function (error, stdout, stderr) {
console.log(`${stdout}`);
console.log(`${stderr}`);
if (error !== null) {
console.log(`sort uniq error: ${error}`);
}
});
}
function countLines(file) {
exec("wc " + file, function (error, stdout, stderr) {
console.log(`${stdout}`);
console.log(`${stderr}`);
if (error !== null) {
console.log(`wc error: ${error}`);
}
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment