Skip to content

Instantly share code, notes, and snippets.

@LeonanCarvalho
Created April 19, 2024 12:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save LeonanCarvalho/48f42162321c2d1b236f64c6536839f7 to your computer and use it in GitHub Desktop.
Save LeonanCarvalho/48f42162321c2d1b236f64c6536839f7 to your computer and use it in GitHub Desktop.
AWS - FIFO SQS Consumer in Batch
const AWS = require('aws-sdk');
const SQS = new AWS.SQS({ region: 'us-east-1' });
const { Parallel } = require('async'); // npm install async
const queueUrl = 'SQS_URN';
const batchSize = 10;
async function processBatch(messages) {
// Implement your message processing
console.log('Batch size', messages.length);
// all messages from this group can be read from:
messages.forEach(message => console.log(message.Body));
}
async function receiveMessages() {
const params = {
QueueUrl: queueUrl,
MaxNumberOfMessages: batchSize,
AttributeNames: ['MessageGroupId'],
MessageAttributeNames: ['All'],
VisibilityTimeout: 10,
WaitTimeSeconds: 20
};
try {
const data = await SQS.receiveMessage(params).promise();
return data.Messages || [];
} catch (err) {
console.error('Erro while receiving messages:', err);
return [];
}
}
// Do the dirt work
async function processMessages() {
while (true) {
const messages = await receiveMessages();
if (messages.length > 0) {
const groups = {};
// Group messages by MessageGroupId
messages.forEach(message => {
const groupId = message.MessageGroupId || 'default';
if (!groups[groupId]) groups[groupId] = [];
groups[groupId].push(message);
});
// Run parallel Task by groups found
const parallelTasks = Object.values(groups).map(groupMessages => {
return async () => {
await processBatch(groupMessages);
};
});
await Parallel(parallelTasks, batchSize); // Limit by batch
}
}
}
processMessages().catch(err => console.error('Erro reading mensagens:', err));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment