Created
January 30, 2019 17:11
-
-
Save jakubturcovsky/7a8c7d33c5130b756b9c20ff86bf18ae to your computer and use it in GitHub Desktop.
Code for reading all messages from SQS queue, sorting lines and counting them
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
let fs = require('fs'); | |
let aws = require('aws-sdk'); | |
let exec = require('child_process').exec; | |
let SqsQueueParallel = require('sqs-queue-parallel'); | |
let sqsUri = "URI TO SQS QUEUE"; // TODO add URI | |
const pollFile = "sqspoll.txt"; | |
const sortFile = "sort.txt"; | |
removeOldFiles(); | |
var i = 0; | |
let pollFileStream = fs.createWriteStream(pollFile, {'flags': 'a'}); | |
function removeOldFiles() { | |
try { | |
fs.unlinkSync(pollFile); | |
fs.unlinkSync(sortFile); | |
} catch (err) {} | |
} | |
// TODO call parallelPoll() or serialPoll() | |
/** | |
* Multi threaded polling. | |
* Reads indefinitely, you have to check if everything is read and deleted and then count messages manually. | |
*/ | |
function parallelPoll() { | |
let queue = new SqsQueueParallel({ | |
accessKeyId: "KEY", // TODO add access key | |
secretAccessKey: "KEY", // TODO add secret access key | |
region: "REGION", // TODO add region | |
name: "QUEUE", // TODO add name of the queue | |
maxNumberOfMessages: 4, // Max number of messages in one request (max 10) | |
concurrency: 8, // Number of concurrent threads | |
VisibilityTimeout: 20, // Hide read messages for n seconds | |
WaitTimeSeconds: 0 | |
}); | |
queue.on('message', function (e) { | |
i++; | |
console.log("i = %d", i); | |
e.data.forEach(function (message) { | |
appendMessage(JSON.stringify(message)); | |
}); | |
e.deleteMessage(function(err, data) { | |
e.next(); | |
}); | |
}); | |
queue.on('error', function (err) { | |
console.log('There was an error: ', err); | |
}); | |
} | |
/** | |
* Single threaded polling. | |
* If there aren't any other messages to read, reading ends and prints count of unique messages. | |
* Create config.json file with one object containing accessKeyId, secretAccessKey and region | |
*/ | |
let params; | |
let sqs; | |
function serialPoll() { | |
aws.config.loadFromPath('./config.json'); | |
sqs = new aws.SQS({apiVersion: '2012-11-05'}); | |
params = { | |
AttributeNames: [ | |
"SentTimestamp" | |
], | |
MaxNumberOfMessages: 10, // Max number of messages in one request (max 10) | |
MessageAttributeNames: [ | |
"All" | |
], | |
QueueUrl: sqsUri, | |
VisibilityTimeout: 20, // Hide read messages for n seconds | |
WaitTimeSeconds: 0 | |
}; | |
pollMessage(); | |
} | |
function pollMessage() { | |
sqs.receiveMessage(params, function (err, data) { | |
if (err) { | |
console.log("Receive Error", err); | |
pollMessage() | |
} else if (data.Messages) { | |
i++; | |
data.Messages.forEach(function (message) { | |
parseMessages(message.Body); | |
}); | |
// Uncomment for deleting messages | |
// var deleteParams = { | |
// QueueUrl: sqsUri, | |
// ReceiptHandle: data.Messages[0].ReceiptHandle | |
// }; | |
// sqs.deleteMessage(deleteParams, function(err, data) { | |
// if (err) { | |
// console.log("Delete Error", err); | |
// } else { | |
// console.log("Message Deleted", data); | |
// } | |
// }); | |
pollMessage(); | |
} else { | |
console.log("Finished, i = %d", i); | |
finalProcessing(); | |
return; | |
} | |
console.log("i = %d", i); | |
}); | |
} | |
function parseMessages(message) { | |
var json = JSON.parse(message); | |
json.forEach(function (object) { | |
appendMessage(JSON.stringify(object)); | |
}); | |
} | |
function appendMessage(message) { | |
appendToStream(message + "\n"); | |
} | |
function appendToStream(message) { | |
pollFileStream.write(message); | |
} | |
/** | |
* Sorting and deleting non-unique messages, counting lines. | |
*/ | |
function finalProcessing() { | |
pollFileStream.end(); | |
sort(pollFile, sortFile); | |
countLines(pollFile); | |
} | |
function sort(fileName, resultFileName) { | |
exec("sort " + pollFile + " | uniq > " + resultFileName, function (error, stdout, stderr) { | |
console.log(`${stdout}`); | |
console.log(`${stderr}`); | |
if (error !== null) { | |
console.log(`sort uniq error: ${error}`); | |
} | |
}); | |
} | |
function countLines(file) { | |
exec("wc " + file, function (error, stdout, stderr) { | |
console.log(`${stdout}`); | |
console.log(`${stderr}`); | |
if (error !== null) { | |
console.log(`wc error: ${error}`); | |
} | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment