Skip to content

Instantly share code, notes, and snippets.

@kfreytag
Created November 24, 2015 01:34
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kfreytag/b8fc477af292c623bba6 to your computer and use it in GitHub Desktop.
Save kfreytag/b8fc477af292c623bba6 to your computer and use it in GitHub Desktop.
JAWS-based Node callback to retrieve and persist Sailthru Jobs to S3
/**
* AWS Module: Action: Modularized Code
*/
var Promise = require('bluebird');
var AWS = require('aws-sdk');
var request = require('request');
var SAILTHRU_CONSTANTS = require('../lib/constants');
var s3 = new AWS.S3();
var dynamodb = new AWS.DynamoDB();
var sns = new AWS.SNS();
var s3Stream = require('s3-upload-stream')(new AWS.S3());
Promise.promisifyAll(Object.getPrototypeOf(s3));
Promise.promisifyAll(Object.getPrototypeOf(dynamodb));
Promise.promisifyAll(Object.getPrototypeOf(sns));
Promise.promisifyAll(Object.getPrototypeOf(s3Stream));
var getBrandId = function(event) {
return new Promise(function (resolve, reject) {
var queryParams = {
TableName: process.env.SAILTHRU_JOBS_DYNAMODB_TABLE,
Key: { SailthruJobId: { S: event.job_id }},
ProjectionExpression: "BrandId"
};
dynamodb.getItemAsync(queryParams)
.then(function(result) {
if (result && 'Item' in result) {
var brandId = result.Item.BrandId.S;
console.log("BrandId is " + brandId);
resolve([event,brandId]);
} else {
reject(event);
}
});
});
};
var copyExportFileToBucket = function(result) {
return new Promise(function (resolve, reject) {
var event = result[0];
var brandId = result[1];
var sailthruJobId = event.job_id;
var sailthruExportUrl = event.export_url;
var s3Key = event.filename;
var file = sailthruExportUrl;
var s3Bucket = getBucketName(brandId, sailthruJobId);
console.log("Uploading " + file);
var upload = s3Stream.upload({
"Bucket": s3Bucket,
"Key": s3Key
});
upload.on('error', function(error) {
reject(error);
});
upload.on('uploaded', function(details) {
resolve([event, file, s3Bucket, s3Key]);
});
request(file).pipe(upload);
});
};
var updateDynamoDbJob = function(result) {
return new Promise(function(resolve, reject) {
console.log("Updating dynamo Job");
var event = result[0];
var SailthruFile = result[1];
var S3BucketPath = result[2];
var S3Key = result[3];
var SailthruJobId = event.job_id;
var updateParams = {
TableName: process.env.SAILTHRU_JOBS_DYNAMODB_TABLE,
Key: { SailthruJobId: { 'S' : SailthruJobId }},
UpdateExpression: "set SailthruExportFile = :val1, S3Bucket = :val2, S3Key = :val3, JobStatus = :val4",
ConditionExpression: "SailthruJobId = :con1",
ExpressionAttributeValues: {
":val1": {"S" : SailthruFile},
":val2": {"S" : S3BucketPath},
":val3": {"S" : S3Key},
":val4": {"S" : SAILTHRU_CONSTANTS.SAILTHRU_JOB_STATUSES.EXPORT_RETRIEVED },
":con1": {"S" : SailthruJobId}
},
ReturnValues: "ALL_NEW"
};
dynamodb.updateItemAsync(updateParams)
.then(function(result) {
if (result && 'Attributes' in result) {
console.log('dynamo db job updated');
resolve(event);
} else {
reject(event);
}
});
});
};
var publishSnsSuccessMessage = function(event) {
return new Promise(function(resolve, reject) {
console.log("Starting SNS publish");
var params = {
Message: "Callback returned from Sailthru",
MessageAttributes: {
SailthruJobId: {
DataType: "String",
StringValue: event.job_id
}
},
TopicArn: process.env.SNS_SAILTHRU_CALLBACK_ARN
};
sns.publishAsync(params)
.then(function(result) {
console.log('SNS message published');
resolve(result);
}).error(function(err) {
console.log(err);
reject(err);
});
});
};
var getBucketName = function(brandId, sailthruJobId) {
return process.env.S3_INPUTS_BUCKET + '/' + process.env.JAWS_STAGE + '/' + process.env.S3_SAILTHRU_BUCKET +'/exports/lists/' + brandId + '/' + sailthruJobId;
};
module.exports = function(event) {
return getBrandId(event)
.then(copyExportFileToBucket)
.then(updateDynamoDbJob)
.then(publishSnsSuccessMessage);
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment