Last active
May 3, 2020 18:11
-
-
Save anandsunderraman/868d1aa8f379c9d15dbc029608befc31 to your computer and use it in GitHub Desktop.
Copying data from mongodb to s3 in a lambda
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const MongoClient = require('mongodb').MongoClient; | |
let s3Client = require('aws-sdk/clients/s3'); | |
//brute force method loading all the data into an array | |
exports.copyData = async (event, context) => { | |
//this is required for node js mongodb connection pooling | |
context.callbackWaitsForEmptyEventLoop = false; | |
let dbConnection = await MongoClient.connect(getDBURI(), { | |
useNewUrlParser: true, | |
useUnifiedTopology: true | |
}); | |
let queryResultPromises = []; | |
let numPages = //calculate the number of pages; | |
//iterating through num of pages and iterate using an aggregation query | |
//using $limit and $skip in aggregates | |
for(let pageNum = 0; pageNum < numPages; pageNum++) { | |
let tempResultPromise = await dbConnection.db("<db-name>").collection("<collection-name>") | |
.aggregate(<aggregate-criteria>) | |
.toArray() | |
//collect the query promise in an array | |
queryResultPromises.push(tempResultPromise); | |
} | |
//collect all the query results in an array and wait for them to resolve using Promise.all | |
let queryResultsArray = await Promise.all(queryResultPromises); | |
//concatenating all the results in a single array | |
let queryResults = []; | |
queryResultsArray.forEach(resultArray => { | |
queryResults = queryResults.concat(resultArray); | |
}); | |
await uploadDataToS3(queryResults); | |
} | |
/** | |
* Construct the DB URI based on the environment | |
* @returns {string} | |
*/ | |
const getDBURI = () => { | |
//best practice is to fetch the password from AWS Parameter store | |
return "mongodb://<username>:<password>@<hostname>/<your-db-name>"; | |
}; | |
//converts each db record to ndjson => newline delimited json | |
let convertToNDJSON = (data) => { | |
let ndJSON = []; | |
data.forEach(el => ndJSON.push(JSON.stringify(el), "\n")); | |
return ndJSON; | |
}; | |
//code to upload data to s3 | |
let uploadDataToS3 = async (data) => { | |
let env = process.env; | |
let s3 = null; | |
//using minio for local s3 testing | |
if (env === 'local') { | |
s3 = new s3Client({ | |
accessKeyId: 'minioadmin' , | |
secretAccessKey: 'minioadmin' , | |
endpoint: 'http://host.docker.internal:9000' , | |
s3ForcePathStyle: true, // needed with minio? | |
signatureVersion: 'v4' | |
}); | |
} else { | |
s3 = new s3Client(); | |
} | |
let params = {Bucket: '<your-bucket-name>', Key: '<file-name>', Body: convertToNDJSON(data).join("")}; | |
return await s3.putObject(params).promise(); | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const MongoClient = require('mongodb').MongoClient; | |
let s3Client = require('aws-sdk/clients/s3'); | |
const stream = require('stream'); | |
const pipeline = stream.pipeline; | |
//brute force method loading all the data into an array | |
exports.copyData = (event, context, callback) => { | |
MongoClient.connect(getDBURI(), { | |
useNewUrlParser: true, | |
useUnifiedTopology: true | |
}).then((dbConnection) => { | |
pipeline( | |
dbConnection.db("<db-name>").collection("<collection-name>").aggregate(<aggregate-criteria>) | |
.stream({transform: x => convertToNDJSON(x)}), | |
uploadDataToS3(callback), | |
(err) => { | |
if (err) { | |
console.log('Pipeline failed.', err); | |
} else { | |
console.log('Pipeline succeeded.'); | |
} | |
} | |
) | |
}) | |
} | |
/** | |
* Construct the DB URI based on the environment | |
* @returns {string} | |
*/ | |
const getDBURI = () => { | |
//best practice is to fetch the password from AWS Parameter store | |
return "mongodb://<username>:<password>@<hostname>/<your-db-name>"; | |
}; | |
//converts each db record to ndjson => newline delimited json | |
let convertToNDJSON = (data) => { | |
return JSON.stringify(data) + "\n"; | |
}; | |
let uploadDataToS3 = (callback) => { | |
let env = process.env; | |
let s3 = null; | |
let pass = new stream.PassThrough(); | |
if (env === 'local') { | |
s3 = new s3Client({ | |
accessKeyId: 'minioadmin' , | |
secretAccessKey: 'minioadmin' , | |
endpoint: 'http://host.docker.internal:9000' , | |
s3ForcePathStyle: true, // needed with minio? | |
signatureVersion: 'v4' | |
}); | |
} else { | |
s3 = new s3Client(); | |
} | |
//using multipart upload to speed up the process | |
let params = {Bucket: '<your-bucket-name>', Key: '<file-name>', Body: data}; | |
let opts = {queueSize: 2, partSize: 1024 * 1024 * 10}; | |
s3.upload(params,opts, function(err, data) { | |
if (err) { | |
console.log(`Error uploading file ${file-name}`,err); | |
} else { | |
console.log(`Successfully uploaded file: ${file-name}, result: ${JSON.stringify(data)}`); | |
} | |
callback(); | |
}); | |
return pass; | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const MongoClient = require('mongodb').MongoClient; | |
let s3Client = require('aws-sdk/clients/s3'); | |
//brute force method loading all the data into an array | |
exports.copyData = async (event, context) => { | |
//this is required for node js mongodb connection pooling | |
context.callbackWaitsForEmptyEventLoop = false; | |
let dbConnection = await MongoClient.connect(getDBURI(), { | |
useNewUrlParser: true, | |
useUnifiedTopology: true | |
}); | |
let queryResultPromises = []; | |
let numPages = //calculate the number of pages; | |
//iterating through num of pages and iterate using an aggregation query | |
//using $limit and $skip in aggregates | |
for(let pageNum = 0; pageNum < numPages; pageNum++) { | |
let tempResultPromise = await dbConnection.db("<db-name>").collection("<collection-name>") | |
.aggregate(<aggregate-criteria>) | |
.toArray() | |
//collect the query promise in an array | |
queryResultPromises.push(tempResultPromise); | |
} | |
//collect all the query results in an array and wait for them to resolve using Promise.all | |
let queryResultsArray = await Promise.all(queryResultPromises); | |
let data = [].concat.apply([], queryResultsArray); | |
await uploadDataToS3(data.map(convertToNDJSON).join("")); | |
} | |
/** | |
* Construct the DB URI based on the environment | |
* @returns {string} | |
*/ | |
const getDBURI = () => { | |
//best practice is to fetch the password from AWS Parameter store | |
return "mongodb://<username>:<password>@<hostname>/<your-db-name>"; | |
}; | |
//converts each db record to ndjson => newline delimited json | |
let convertToNDJSON = (data) => { | |
let ndJSON = []; | |
data.forEach(el => ndJSON.push(JSON.stringify(el), "\n")); | |
return ndJSON; | |
}; | |
//code to upload data to s3 | |
let uploadDataToS3 = async (data) => { | |
let env = process.env; | |
let s3 = null; | |
//using minio for local s3 testing | |
if (env === 'local') { | |
s3 = new s3Client({ | |
accessKeyId: 'minioadmin' , | |
secretAccessKey: 'minioadmin' , | |
endpoint: 'http://host.docker.internal:9000' , | |
s3ForcePathStyle: true, // needed with minio? | |
signatureVersion: 'v4' | |
}); | |
} else { | |
s3 = new s3Client(); | |
} | |
//using multipart upload to speed up the process | |
let params = {Bucket: '<your-bucket-name>', Key: '<file-name>', Body: data}; | |
let opts = {queueSize: 2, partSize: 1024 * 1024 * 10}; | |
return await s3.upload(params, opts).promise(); | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const MongoClient = require('mongodb').MongoClient; | |
let s3Client = require('aws-sdk/clients/s3'); | |
const stream = require('stream'); | |
const pipeline = stream.pipeline; | |
//brute force method loading all the data into an array | |
exports.copyData = (event, context, callback) => { | |
MongoClient.connect(getDBURI(), { | |
useNewUrlParser: true, | |
useUnifiedTopology: true | |
}).then((dbConnection) => { | |
pipeline( | |
dbConnection.db("<db-name>").collection("<collection-name>").aggregate(<aggregate-criteria>) | |
.stream({transform: x => convertToNDJSON(x)}), | |
uploadDataToS3(), | |
(err) => { | |
if (err) { | |
console.log('Pipeline failed.', err); | |
} else { | |
console.log('Pipeline succeeded.'); | |
} | |
callback(); | |
} | |
) | |
}) | |
} | |
/** | |
* Construct the DB URI based on the environment | |
* @returns {string} | |
*/ | |
const getDBURI = () => { | |
//best practice is to fetch the password from AWS Parameter store | |
return "mongodb://<username>:<password>@<hostname>/<your-db-name>"; | |
}; | |
//converts each db record to ndjson => newline delimited json | |
//converts each db record to ndjson => newline delimited json | |
let convertToNDJSON = (data) => { | |
return JSON.stringify(data) + "\n"; | |
}; | |
let uploadDataToS3 = () => { | |
let env = process.env; | |
let s3 = null; | |
let pass = new stream.PassThrough(); | |
if (env === 'local') { | |
s3 = new s3Client({ | |
accessKeyId: 'minioadmin' , | |
secretAccessKey: 'minioadmin' , | |
endpoint: 'http://host.docker.internal:9000' , | |
s3ForcePathStyle: true, // needed with minio? | |
signatureVersion: 'v4' | |
}); | |
} else { | |
s3 = new s3Client(); | |
} | |
//using multipart upload to speed up the process | |
let params = {Bucket: '<your-bucket-name>', Key: '<file-name>', Body: data}; | |
let opts = {queueSize: 2, partSize: 1024 * 1024 * 10}; | |
s3.upload(params,opts, function(err, data) { | |
if (err) { | |
console.log(`Error uploading file ${file-name}`,err); | |
} else { | |
console.log(`Successfully uploaded file: ${file-name}, result: ${JSON.stringify(data)}`); | |
} | |
}); | |
return pass; | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment