Skip to content

Instantly share code, notes, and snippets.

@jlis
Created May 18, 2018 08:49
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jlis/6d3acd78a04ef202b16c59bb3fc833b2 to your computer and use it in GitHub Desktop.
Save jlis/6d3acd78a04ef202b16c59bb3fc833b2 to your computer and use it in GitHub Desktop.
AWS Lambda function to save events from SQS into DynamoDB
var AWS = require('aws-sdk');
var sqs = new AWS.SQS({
region: process.env.AWS_REGION
});
var dynamodb = new AWS.DynamoDB();
var REPEAT_THRESHOLD = process.env.REPEAT_THRESHOLD || 20000;
function receiveSQSMessages(callback) {
var params = {
QueueUrl: process.env.TASK_QUEUE_URL,
MaxNumberOfMessages: process.env.MAX_TASKS || 10,
VisibilityTimeout: 5,
WaitTimeSeconds: 0
};
sqs.receiveMessage(params, function(err, data) {
if (err) {
console.error(err, err.stack);
callback(err);
} else {
callback(null, data.Messages);
}
});
}
function writeEvents(rawMessages, callback) {
var messages = [];
rawMessages.forEach(function(message) {
messages.push({
PutRequest: {
Item: {
"user_id": {
N: "123",
},
"account_id": {
N: "456",
},
"type": {
S: "some_event"
}
}
}
});
});
if (messages.length > 0) {
dynamodb.batchWriteItem({
RequestItems: {
process.env.DYNAMODB_TABLE: messages
}
}, function(err, data) {
if (err) {
console.error(err, err.stack);
callback(err);
} else {
deleteSQSMessages(rawMessages, callback);
}
})
}
}
function deleteSQSMessages(messages, callback) {
var entries = [];
messages.forEach(function(message) {
entries.push({
Id: message.MessageId,
ReceiptHandle: message.ReceiptHandle
});
});
if (entries.length > 0) {
sqs.deleteMessageBatch({
Entries: entries,
QueueUrl: process.env.TASK_QUEUE_URL,
}, function(err, data) {
if (err) {
console.error(err, err.stack);
callback(err);
}
});
}
}
function handleSQSMessages(context, callback) {
receiveSQSMessages(function(err, messages) {
if (err) {
console.log(err, err.stack);
callback(err);
}
if (messages && messages.length > 0) {
writeEvents(messages, callback);
console.log(context.getRemainingTimeInMillis());
if (context.getRemainingTimeInMillis() > REPEAT_THRESHOLD) {
console.log('run again?');
callback(null, 'DONE');
} else {
callback(null, 'PAUSE');
}
} else {
callback(null, 'DONE');
}
});
}
exports.handler = function(event, context, callback) {
handleSQSMessages(context, callback);
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment