Created
February 3, 2021 23:34
-
-
Save simone-coelho/4c0472a41ad836836ea53ddd6290da1f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
let axios = require("axios"); | |
let argv = require("yargs").argv; | |
let s3 = require("@auth0/s3"); | |
let fs = require("fs"); | |
let path = require("path"); | |
// let parquet = require('node-parquet'); | |
let appDir = path.dirname(require.main.filename); | |
let decisionsFiles = []; | |
let eventsFiles = []; | |
//require('s3-upload-resume'); | |
// | |
let AWS = require("aws-sdk"); | |
let aws_S3 = new AWS.S3(); | |
let s3Client = undefined; | |
let account_id = ""; | |
let _authToken_ = 'API Token Goes Here' | |
const destinationFolder = "./data"; | |
const authEndpoint = "https://api.optimizely.com/v2/export/credentials"; | |
const escapeRegExp = (str) => | |
str.replace(/[\-\[\]\/\{\}\(\)\*\+\?\.\\\^\$\|]/g, "\\$&"); | |
let validCredentials = false; | |
let awsCredentials = { | |
AWS_ACCESS_KEY_ID: "", | |
AWS_SECRET_ACCESS_KEY: "", | |
AWS_SESSION_TOKEN: "", | |
AWS_SESSION_EXPIRATION: "", | |
S3_BASE_PATH: "", | |
}; | |
function validateCredentials(token, sessionExpiration) { | |
result = false; | |
if (!token) { | |
return { error: "Invalid authentication token." }; | |
} | |
return true; | |
} | |
function validateDate(dateString) { | |
// Verify that the date format is yyyy-mm-dd | |
let dateformat = /(\d{4})-(\d{2})-(\d{2})/; | |
// Match the date format through regular expression | |
if (dateString.match(dateformat)) { | |
let operator = dateString.split("-"); | |
// Extract the string into month, date and year | |
let datepart = []; | |
if (operator.length > 1) { | |
pdatepart = dateString.split("-"); | |
} | |
let month = parseInt(datepart[0]); | |
let day = parseInt(datepart[1]); | |
let year = parseInt(datepart[2]); | |
// Create list of days of a month | |
let ListofDays = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]; | |
if (month == 1 || month > 2) { | |
if (day > ListofDays[month - 1]) { | |
///This check is for Confirming that the date is not out of its range | |
return false; | |
} | |
} else if (month == 2) { | |
let leapYear = false; | |
if ((!(year % 4) && year % 100) || !(year % 400)) { | |
leapYear = true; | |
} | |
if (leapYear == false && day >= 29) { | |
return false; | |
} else if (leapYear == true && day > 29) { | |
console.log("Invalid date format!"); | |
return false; | |
} | |
} | |
} else { | |
console.log("Invalid date format!"); | |
return false; | |
} | |
return true; | |
} | |
const filterBy = (term) => { | |
const re = new RegExp(escapeRegExp(term), "i"); | |
return (filterObj) => { | |
for (let prop in filterObj) { | |
if (!filterObj.hasOwnProperty(prop)) { | |
continue; | |
} | |
if (re.test(filterObj[prop])) { | |
return true; | |
} | |
} | |
return false; | |
}; | |
}; | |
let httpHeaders = { | |
headers: { | |
Authorization: `Bearer ${_authToken_}`, | |
}, | |
}; | |
const emptyValues = function(_object_) { | |
for (var i in _object_) { | |
if (!_object_[i] || _object_[i] === "") return true; | |
} | |
return false; | |
}; | |
function deepClone(obj) { | |
let result = undefined; | |
if (obj) { | |
result = JSON.parse(JSON.stringify(obj)); | |
} | |
return result; | |
} | |
let dateRangeList = []; | |
let decisionPaths = []; | |
let eventsPaths = []; | |
let mergedPathsList = []; | |
let argvErrors = []; | |
let s3PrefixList = []; | |
const _startDate = "2020-12-04"; | |
const _endDate = "2020-12-06"; | |
const s3_Bucket = "optimizely-events-data"; | |
const pathTemplate = `s3://${s3_Bucket}/v1/account_id=${account_id}/`; | |
const validTypes = ["events", "decisions", "events_decisions"]; | |
let _args_ = { | |
type: "decisions", | |
startDate: "", | |
endDate: "", | |
experiment: ['19546930211'], | |
account_id: "", | |
}; | |
if (argv.type && validTypes.includes(argv.type)) { | |
_args_.type = argv.type; //events_decisions | |
} else { | |
argvErrors.push[{ error: "Type is not valid." }]; | |
} | |
argv.type = 'decisions'; | |
if (argv.bucket) { | |
_args_.bucket = argv.bucket; | |
} else { | |
if (!s3_Bucket) { | |
argvErrors.push[{ error: "Type is not valid." }]; | |
} else { | |
_args_.bucket = s3_Bucket; | |
} | |
} | |
if (argv.startDate && validateDate(argv.startDate)) { | |
_args_.start = argv.startDate; | |
} else { | |
if (_startDate) { | |
_args_.startDate = _startDate; | |
} else { | |
argvErrors.push[{ error: "Start date is not valid." }]; | |
} | |
} | |
if (argv.endDate && validateDate(argv.endDate)) { | |
_args_.endDate = argv.endDate; | |
} else { | |
if (_endDate) { | |
_args_.endDate = _endDate; | |
} else { | |
argvErrors.push[{ error: "End date is not valid." }]; | |
} | |
} | |
if (argv.destinationFolder) { | |
_args_.destinationFolder = argv.destinationFolder; | |
} else { | |
_args_.destinationFolder = destinationFolder; | |
} | |
if (argv.removeFiles) { | |
_args_.removeFiles = argv.removeFiles; | |
} else { | |
_args_.removeFiles = false; | |
} | |
if (argv.database) { | |
_args_.database = argv.database; | |
} else { | |
_args_.database = ""; | |
} | |
if (argv.connectionString) { | |
_args_.connectionString = argv.connectionString; | |
} else { | |
_args_.connectionString = ""; | |
} | |
if (argv.userName) { | |
_args_.userName = argv.userName; | |
} else { | |
_args_.userName = ""; | |
} | |
if (argv.password) { | |
_args_.password = argv.password; | |
} else { | |
_args_.password = ""; | |
} | |
let experimentList = []; | |
if (argv.experiment) { | |
if (Array.isArray(argv.experiment)) { | |
experimentList = argv.experiment; | |
} else { | |
experimentList.push(argv.experiment); | |
} | |
if ( | |
experimentList && | |
Array.isArray(experimentList) && | |
experimentList.length > 0 | |
) { | |
_args_.experimentList = [...experimentList]; | |
} else { | |
argvErrors.push[{ error: "The experiment list is not valid." }]; | |
} | |
} | |
if (argv.account_id && isNaN(argv.account_id)) { | |
_args_.account_id = argv.account_id; | |
} else { | |
if (account_id && isNaN(account_id)) { | |
_args_.account_id = account_id; | |
} else { | |
argvErrors.push[{ error: "Account is not valid." }]; | |
} | |
} | |
function getDateList(start, end) { | |
if (!start || !end) return { error: "Missing the start or end date." }; | |
if (!validateDate(start)) return { error: "Invalid start date." }; | |
if (!validateDate(end)) return { error: "Invalid end date." }; | |
let result = []; | |
function getDaysArray(start, end) { | |
for ( | |
var arr = [], dt = new Date(start); | |
dt <= end; | |
dt.setDate(dt.getDate() + 1) | |
) { | |
arr.push(new Date(dt)); | |
} | |
return arr; | |
} | |
const startDate = new Date(start); | |
const endDate = new Date(start); | |
if (startDate > endDate) { | |
return { error: "The start date cannot be greater than the end date." }; | |
} | |
result = getDaysArray(new Date(start), new Date(end)); | |
if (result && result.length === 0) { | |
return { | |
error: | |
"Unable to build a date range from the start and end date parameters.", | |
}; | |
} | |
let resultArray = []; | |
result.map((res) => { | |
resultArray.push(res.toISOString().slice(0, 10)); | |
}); | |
return resultArray; | |
} | |
async function getObjectFromUrl(url) { | |
let result = {}; | |
if (url !== "") { | |
await axios | |
.get(url, httpHeaders) | |
.then((res) => { | |
result = { response: res.data, status: res.status }; | |
}) | |
.catch((responseError) => { | |
result = { error: `Unable to get object at: ${url}` }; | |
console.log(result.error, error); | |
result = undefined; | |
}); | |
} else { | |
result = { | |
error: `Unable to get object at: ${url}. The URL endpoint is invalid or does not exist.`, | |
}; | |
console.log(result.error); | |
} | |
return result; | |
} | |
function buildS3PathListByEventType(dateRange, typeValue, experimentList = []) { | |
let result = []; | |
let resultArray = []; | |
if (_args_.account_id) { | |
_pathTemplate_ = pathTemplate; | |
} else { | |
_pathTemplate_ = awsCredentials.S3_BASE_PATH; | |
} | |
if ( | |
dateRange && | |
typeValue && | |
Array.isArray(dateRange) && | |
dateRange.length > 0 | |
) { | |
const tempPath = _pathTemplate_ + `type=${typeValue}`; | |
dateRange.map((dateValue) => { | |
const currentPath = tempPath + `/date=${dateValue}` + "/"; | |
result.push(currentPath); | |
s3PrefixList.push( | |
currentPath.substr(currentPath.indexOf("v1/"), currentPath.length) | |
); | |
}); | |
if ( | |
experimentList && | |
Array.isArray(experimentList) && | |
experimentList.length > 0 && | |
result.length > 0 | |
) { | |
experimentList.map((experiment) => { | |
result.map((s3Path) => { | |
const currentPath = s3Path + `/experiment=${experiment}`; | |
resultArray.push(currentPath); | |
}); | |
}); | |
} else { | |
resultArray = result; | |
} | |
switch (typeValue) { | |
case "decisions": | |
decisionPaths = resultArray; | |
break; | |
case "events": | |
eventsPaths = resultArray; | |
break; | |
default: | |
return { | |
error: | |
'Invalid event type. Values permitted are "events", "decisions" or "events_decisions".', | |
}; | |
} | |
} | |
return resultArray; | |
} | |
function buildS3PathList(dateList, eventType, experimentList = []) { | |
switch (eventType) { | |
case "decisions": | |
decisionPaths = buildS3PathListByEventType( | |
dateList, | |
eventType, | |
experimentList | |
); | |
break; | |
case "events": | |
eventsPaths = buildS3PathListByEventType( | |
dateList, | |
eventType, | |
experimentList | |
); | |
break; | |
case "events_decisions": | |
decisionPaths = buildS3PathListByEventType( | |
dateList, | |
"decisions", | |
experimentList | |
); | |
eventsPaths = buildS3PathListByEventType( | |
dateList, | |
"events", | |
experimentList | |
); | |
break; | |
default: | |
return { | |
error: | |
'Invalid event type. Values permitted are "events" or "decisions"', | |
}; | |
} | |
return [...decisionPaths, ...eventsPaths]; | |
} | |
function getS3Client(credentials) { | |
s3Client = s3.createClient({ | |
maxAsyncS3: 20, // this is the default | |
s3RetryCount: 3, // this is the default | |
s3RetryDelay: 3000, // this is the default | |
multipartUploadThreshold: 20971520, // this is the default (20 MB) | |
multipartUploadSize: 15728640, // this is the default (15 MB) | |
s3Options: { | |
accessKeyId: credentials.AWS_ACCESS_KEY_ID, | |
secretAccessKey: credentials.AWS_SECRET_ACCESS_KEY, | |
sessionToken: credentials.AWS_SESSION_TOKEN, | |
expireTime: credentials.AWS_SESSION_EXPIRATION, | |
region: "us-east-1", | |
//endpoint: 's3-aws-region.amazonaws.com', | |
// sslEnabled: false | |
// any other options are passed to new AWS.S3() | |
// See: http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Config.html#constructor-property | |
}, | |
}); | |
return s3Client; | |
} | |
const deleteFolderRecursive = function(path) { | |
if (fs.existsSync(path)) { | |
fs.readdirSync(path).forEach(function(file, index) { | |
var curPath = path + "/" + file; | |
if (fs.lstatSync(curPath).isDirectory()) { | |
// recurse | |
deleteFolderRecursive(curPath); | |
} else { | |
// delete file | |
fs.unlinkSync(curPath); | |
} | |
}); | |
fs.rmdirSync(path); | |
} | |
}; | |
let fileList = []; | |
const getAllFiles = function(dirPath, arrayOfFiles) { | |
files = fs.readdirSync(dirPath); | |
arrayOfFiles = arrayOfFiles || []; | |
files.forEach(function(file) { | |
if (fs.statSync(dirPath + "/" + file).isDirectory()) { | |
arrayOfFiles = getAllFiles(dirPath + "/" + file, arrayOfFiles); | |
} else { | |
arrayOfFiles.push(path.join(dirPath, file)); | |
} | |
}); | |
return arrayOfFiles; | |
}; | |
const getS3Params = function(localFile, s3Object, callback) { | |
console.log("getS3params for: ", localFile); | |
if ( | |
endsWith(localFile, "/") || | |
localFile.indexOf("experiment=NULL") > 0 || | |
localFile.indexOf("SUC") > 0 | |
) { | |
callback(null, null); // Skip folders | |
} else { | |
callback(null, {}); | |
} | |
}; | |
function endsWith(str, suffix) { | |
return str.indexOf(suffix, str.length - suffix.length) !== -1; | |
} | |
async function downloadFilesFromS3(authToken) { | |
let errors = {}; | |
if (_args_.length > 0) { | |
const errors = { error: JSON.stringify(_args_) }; | |
console.log("Incomplete or invalid arguments", errors.error); | |
return errors; | |
} | |
let result = undefined; | |
let dateRangeList = []; | |
validCredentials = validateCredentials(authToken, null); | |
if (!validCredentials) | |
return { | |
error: "Invalid credentials. A valid personal access token is missing.", | |
}; | |
let authApiResponse = await getObjectFromUrl(authEndpoint); | |
if (authApiResponse && !authApiResponse.error) { | |
const { response, status } = { ...authApiResponse }; | |
_appDir = path.join(appDir, "data"); | |
if (!fs.existsSync(_appDir)) { | |
fs.mkdirSync(_appDir); | |
} | |
const eventsDir = path.join(appDir, "data", "events"); | |
if (!fs.existsSync(eventsDir)) { | |
fs.mkdirSync(eventsDir); | |
} | |
const decisionsDir = path.join(appDir, "data", "decisions"); | |
if (!fs.existsSync(decisionsDir)) { | |
fs.mkdirSync(decisionsDir); | |
} | |
awsCredentials.AWS_ACCESS_KEY_ID = response.credentials.accessKeyId; | |
awsCredentials.AWS_SECRET_ACCESS_KEY = response.credentials.secretAccessKey; | |
awsCredentials.AWS_SESSION_TOKEN = response.credentials.sessionToken; | |
awsCredentials.AWS_SESSION_EXPIRATION = response.credentials.expiration; | |
awsCredentials.S3_BASE_PATH = response.s3Path; | |
// process.env['AWS_ACCESS_KEY_ID'] = response.credentials.accessKeyId; | |
// process.env['AWS_SECRET_ACCESS_KEY'] = response.credentials.secretAccessKey; | |
// process.env['AWS_SESSION_TOKEN'] = response.credentials.sessionToken; | |
// process.env['AWS_SESSION_EXPIRATION'] = response.credentials.expiration; | |
//const _types = ["events", "decisions"]; | |
const _types = ["decisions"]; | |
if (!emptyValues(awsCredentials)) { | |
//console.log(awsCredentials); | |
dateRangeList = getDateList(_args_.startDate, _args_.endDate); | |
mergedPathsList = buildS3PathList( | |
dateRangeList, | |
_args_.type, | |
_args_.experimentList | |
); | |
if (mergedPathsList.length > 0) { | |
_types.map(function(_type) { | |
const s3_client = getS3Client(awsCredentials); | |
s3PrefixList | |
.filter((_prefix) => _prefix.indexOf(_type) !== -1) | |
.map(async function(prefix) { | |
const params = { | |
recursive: true, | |
followSymlinks: true, | |
//deleteRemoved: true, | |
localDir: path.join(_args_.destinationFolder, _type), | |
getS3Params: getS3Params, | |
s3Params: { | |
Bucket: s3_Bucket, | |
Prefix: prefix, | |
}, | |
}; | |
//let temps3 = s3.getPublicUrl(s3_Bucket, s3PrefixList[0]); | |
// decisions and events folder | |
let s3EventListener = s3_client.downloadDir(params); | |
s3EventListener.on("fileDownloadStart", function( | |
localFilePath, | |
s3Key | |
) { | |
console.log(localFilePath, s3Key); | |
// const t = s3.getPublicUrlHttp(s3_Bucket. s3key); | |
}); | |
s3EventListener.on("fileDownloadEnd", function( | |
localFilePath, | |
s3Key | |
) { | |
console.log(localFilePath, s3Key); | |
}); | |
s3EventListener.on("progress", function() { | |
// console.log( | |
// "Files Found:", | |
// s3EventListener.filesFound, | |
// " Progress Amount:", | |
// s3EventListener.progressAmount, | |
// " Total:", | |
// s3EventListener.progressTotal | |
// ); | |
}); | |
s3EventListener.on("end", function() { | |
const localType = _type; | |
if ((localType === "events")) { | |
eventsFiles.push(...getAllFiles(eventsDir)); | |
fileList.push(eventsFiles); | |
} else { | |
decisionsFiles.push(...getAllFiles(decisionsDir)); | |
fileList.push(decisionsFiles); | |
} | |
console.log(`Completed downloading all files for ${localType}.`); | |
//deleteFolderRecursive(_appDir); | |
//readParquetFile(fileList[1]); | |
}); | |
s3EventListener.on("error", function(err) { | |
console.log("An error has occurred: ", err); | |
}); | |
}); | |
}); | |
} | |
} | |
console.log(fileList); | |
} else return authApiResponse.error; | |
} | |
// async function readParquetFile(fileName) { | |
// let reader = new parquet.ParquetReader(fileName); | |
// console.log(reader.info()); | |
// console.log(reader.rows(); | |
// reader.close(); | |
// let reader = await parquet.ParquetReader.openFile(fileName); | |
// // create a new cursor | |
// let cursor = reader.getCursor(); | |
// console.log(reader.schema); | |
// // read all records from the file and print them | |
// let record = null; | |
// while (record = await cursor.next()) { | |
// console.log(record); | |
// } | |
downloadFilesFromS3(_authToken_); | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment