Skip to content

Instantly share code, notes, and snippets.

@schmod
Created August 1, 2017 18:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save schmod/e03d5eb407abf67198473e1b2e21ef31 to your computer and use it in GitHub Desktop.
Save schmod/e03d5eb407abf67198473e1b2e21ef31 to your computer and use it in GitHub Desktop.
Reprocessing stuck AWS Lambda Redshift batches
/*
This is a quick-and-dirty script to reprocess files that have been partially-processed by aws-lambda-redshift-loader,
if an error has left entries in your LambdaRedshiftProcessedFiles DynamoDB table that don't have corresponding entries
in the LambdaRedshiftBatches table (and therefore cannot be easily reprocessed by any of the tools bundled with
aws-lambda-redshift-loader).
You'll need to enter a few configuration details about your AWS account and S3 bucket, and run query.js to generate
a list of S3 keys that you'd like to retry.
This script deletes the erroneous entry in LambdaRedshiftProcessedFiles, and sends a fake ObjectCreated event to the
Lambda function, to "replay" the sequence of events that should have taken place when your file was first stored in S3.
PLEASE READ THIS SCRIPT AND UNDERSTAND WHAT IT DOES BEFORE YOU RUN IT.
USE AT YOUR OWN RISK.
See https://github.com/awslabs/aws-lambda-redshift-loader/issues/126 for additional context.
*/
const AWS = require('aws-sdk');
const Promise = require('bluebird');
const fs = require('fs');
const ChildProcess = require('child_process');
const BUCKET = 'INSERT_BUCKET_HERE';
const REGION = 'us-east-1';
const lambda = new AWS.Lambda({
accessKeyId: 'INSERT_KEY_HERE',
secretAccessKey: 'INSERT_KEY_HERE',
region: REGION
});
// Create a file listing all of the S3 keys that you would like to reprocess, one key per line.
// query.js (in this gist) can create this list for you.
// you should probably manually review the output before running this though!
const keys = fs.readFileSync('LIST_OF_FILES_YOU_WANT_TO_REPROCESS', 'utf8').split('\n');
return Promise.mapSeries(keys, (key) => {
return new Promise((resolve, reject) => {
const existing = JSON.parse(ChildProcess.execSync(`node processedFiles.js --region ${REGION} --query --file ${key}`));
if (existing.batchId) {
console.warn(`KEY ${key} already has a batch.`);
resolve();
return;
}
console.log(ChildProcess.execSync(`node processedFiles.js --region ${REGION} --delete --file ${key}`).toString('utf8'));
const simulatedS3Event = getSimulatedS3Event(key, BUCKET);
lambda.invoke({
FunctionName: 'csv2redshift',
//ClientContext: '',
InvocationType: 'RequestResponse',
LogType: 'Tail',
Payload: JSON.stringify(simulatedS3Event),
Qualifier: '$LATEST'
}, (err, data) => {
if (err) {
console.error(err);
reject(err);
} else {
const msg = Buffer.from(data.LogResult, 'base64').toString('utf8');
console.log(msg);
resolve(data);
}
});
});
});
function getSimulatedS3Event (key, bucket) {
return {
"Records": [
{
"eventVersion": "2.0",
"eventTime": "1970-01-01T00:00:00.000Z",
"requestParameters": {
"sourceIPAddress": "127.0.0.1"
},
"s3": {
"configurationId": "testConfigRule",
"object": {
"eTag": "0123456789abcdef0123456789abcdef",
"sequencer": "0A1B2C3D4E5F678901",
"key": key.trim(),
"size": 1024
},
"bucket": {
"arn": "arn:aws:s3:::" + bucket,
"name": bucket,
"ownerIdentity": {
"principalId": "EXAMPLE"
}
},
"s3SchemaVersion": "1.0"
},
"responseElements": {
"x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH",
"x-amz-request-id": "EXAMPLE123456789"
},
"awsRegion": "us-east-1",
"eventName": "ObjectCreated:Put",
"userIdentity": {
"principalId": "EXAMPLE"
},
"eventSource": "aws:s3"
}
]
}
}
const ChildProcess = require('child_process');
/*
This can take a while, depending on the provisioned capacity of your table.
page-size has deliberately been set to a very conservative value
*/
const response = JSON.parse(ChildProcess.execSync(`aws dynamodb scan --table-name LambdaRedshiftProcessedFiles --filter-expression "attribute_not_exists(batchId)" --total-segments 1 --segment 0 --page-size 100`));
const keys = response.Items.map(item => item.loadFile.S).sort();
keys.forEach(key => console.log(key));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment