Skip to content

Instantly share code, notes, and snippets.

@anandsunderraman
Last active May 3, 2020 18:11
Show Gist options
  • Save anandsunderraman/868d1aa8f379c9d15dbc029608befc31 to your computer and use it in GitHub Desktop.
Save anandsunderraman/868d1aa8f379c9d15dbc029608befc31 to your computer and use it in GitHub Desktop.
Copying data from mongodb to s3 in a lambda
const MongoClient = require('mongodb').MongoClient;
let s3Client = require('aws-sdk/clients/s3');
//brute force method loading all the data into an array
exports.copyData = async (event, context) => {
//this is required for node js mongodb connection pooling
context.callbackWaitsForEmptyEventLoop = false;
let dbConnection = await MongoClient.connect(getDBURI(), {
useNewUrlParser: true,
useUnifiedTopology: true
});
let queryResultPromises = [];
let numPages = //calculate the number of pages;
//iterating through num of pages and iterate using an aggregation query
//using $limit and $skip in aggregates
for(let pageNum = 0; pageNum < numPages; pageNum++) {
let tempResultPromise = await dbConnection.db("<db-name>").collection("<collection-name>")
.aggregate(<aggregate-criteria>)
.toArray()
//collect the query promise in an array
queryResultPromises.push(tempResultPromise);
}
//collect all the query results in an array and wait for them to resolve using Promise.all
let queryResultsArray = await Promise.all(queryResultPromises);
//concatenating all the results in a single array
let queryResults = [];
queryResultsArray.forEach(resultArray => {
queryResults = queryResults.concat(resultArray);
});
await uploadDataToS3(queryResults);
}
/**
* Construct the DB URI based on the environment
* @returns {string}
*/
const getDBURI = () => {
//best practice is to fetch the password from AWS Parameter store
return "mongodb://<username>:<password>@<hostname>/<your-db-name>";
};
//converts each db record to ndjson => newline delimited json
let convertToNDJSON = (data) => {
let ndJSON = [];
data.forEach(el => ndJSON.push(JSON.stringify(el), "\n"));
return ndJSON;
};
//code to upload data to s3
let uploadDataToS3 = async (data) => {
let env = process.env;
let s3 = null;
//using minio for local s3 testing
if (env === 'local') {
s3 = new s3Client({
accessKeyId: 'minioadmin' ,
secretAccessKey: 'minioadmin' ,
endpoint: 'http://host.docker.internal:9000' ,
s3ForcePathStyle: true, // needed with minio?
signatureVersion: 'v4'
});
} else {
s3 = new s3Client();
}
let params = {Bucket: '<your-bucket-name>', Key: '<file-name>', Body: convertToNDJSON(data).join("")};
return await s3.putObject(params).promise();
};
const MongoClient = require('mongodb').MongoClient;
let s3Client = require('aws-sdk/clients/s3');
const stream = require('stream');
const pipeline = stream.pipeline;
//brute force method loading all the data into an array
exports.copyData = (event, context, callback) => {
MongoClient.connect(getDBURI(), {
useNewUrlParser: true,
useUnifiedTopology: true
}).then((dbConnection) => {
pipeline(
dbConnection.db("<db-name>").collection("<collection-name>").aggregate(<aggregate-criteria>)
.stream({transform: x => convertToNDJSON(x)}),
uploadDataToS3(callback),
(err) => {
if (err) {
console.log('Pipeline failed.', err);
} else {
console.log('Pipeline succeeded.');
}
}
)
})
}
/**
* Construct the DB URI based on the environment
* @returns {string}
*/
const getDBURI = () => {
//best practice is to fetch the password from AWS Parameter store
return "mongodb://<username>:<password>@<hostname>/<your-db-name>";
};
//converts each db record to ndjson => newline delimited json
let convertToNDJSON = (data) => {
return JSON.stringify(data) + "\n";
};
let uploadDataToS3 = (callback) => {
let env = process.env;
let s3 = null;
let pass = new stream.PassThrough();
if (env === 'local') {
s3 = new s3Client({
accessKeyId: 'minioadmin' ,
secretAccessKey: 'minioadmin' ,
endpoint: 'http://host.docker.internal:9000' ,
s3ForcePathStyle: true, // needed with minio?
signatureVersion: 'v4'
});
} else {
s3 = new s3Client();
}
//using multipart upload to speed up the process
let params = {Bucket: '<your-bucket-name>', Key: '<file-name>', Body: data};
let opts = {queueSize: 2, partSize: 1024 * 1024 * 10};
s3.upload(params,opts, function(err, data) {
if (err) {
console.log(`Error uploading file ${file-name}`,err);
} else {
console.log(`Successfully uploaded file: ${file-name}, result: ${JSON.stringify(data)}`);
}
callback();
});
return pass;
};
const MongoClient = require('mongodb').MongoClient;
let s3Client = require('aws-sdk/clients/s3');
//brute force method loading all the data into an array
exports.copyData = async (event, context) => {
//this is required for node js mongodb connection pooling
context.callbackWaitsForEmptyEventLoop = false;
let dbConnection = await MongoClient.connect(getDBURI(), {
useNewUrlParser: true,
useUnifiedTopology: true
});
let queryResultPromises = [];
let numPages = //calculate the number of pages;
//iterating through num of pages and iterate using an aggregation query
//using $limit and $skip in aggregates
for(let pageNum = 0; pageNum < numPages; pageNum++) {
let tempResultPromise = await dbConnection.db("<db-name>").collection("<collection-name>")
.aggregate(<aggregate-criteria>)
.toArray()
//collect the query promise in an array
queryResultPromises.push(tempResultPromise);
}
//collect all the query results in an array and wait for them to resolve using Promise.all
let queryResultsArray = await Promise.all(queryResultPromises);
let data = [].concat.apply([], queryResultsArray);
await uploadDataToS3(data.map(convertToNDJSON).join(""));
}
/**
* Construct the DB URI based on the environment
* @returns {string}
*/
const getDBURI = () => {
//best practice is to fetch the password from AWS Parameter store
return "mongodb://<username>:<password>@<hostname>/<your-db-name>";
};
//converts each db record to ndjson => newline delimited json
let convertToNDJSON = (data) => {
let ndJSON = [];
data.forEach(el => ndJSON.push(JSON.stringify(el), "\n"));
return ndJSON;
};
//code to upload data to s3
let uploadDataToS3 = async (data) => {
let env = process.env;
let s3 = null;
//using minio for local s3 testing
if (env === 'local') {
s3 = new s3Client({
accessKeyId: 'minioadmin' ,
secretAccessKey: 'minioadmin' ,
endpoint: 'http://host.docker.internal:9000' ,
s3ForcePathStyle: true, // needed with minio?
signatureVersion: 'v4'
});
} else {
s3 = new s3Client();
}
//using multipart upload to speed up the process
let params = {Bucket: '<your-bucket-name>', Key: '<file-name>', Body: data};
let opts = {queueSize: 2, partSize: 1024 * 1024 * 10};
return await s3.upload(params, opts).promise();
};
const MongoClient = require('mongodb').MongoClient;
let s3Client = require('aws-sdk/clients/s3');
const stream = require('stream');
const pipeline = stream.pipeline;
//brute force method loading all the data into an array
exports.copyData = (event, context, callback) => {
MongoClient.connect(getDBURI(), {
useNewUrlParser: true,
useUnifiedTopology: true
}).then((dbConnection) => {
pipeline(
dbConnection.db("<db-name>").collection("<collection-name>").aggregate(<aggregate-criteria>)
.stream({transform: x => convertToNDJSON(x)}),
uploadDataToS3(),
(err) => {
if (err) {
console.log('Pipeline failed.', err);
} else {
console.log('Pipeline succeeded.');
}
callback();
}
)
})
}
/**
* Construct the DB URI based on the environment
* @returns {string}
*/
const getDBURI = () => {
//best practice is to fetch the password from AWS Parameter store
return "mongodb://<username>:<password>@<hostname>/<your-db-name>";
};
//converts each db record to ndjson => newline delimited json
//converts each db record to ndjson => newline delimited json
let convertToNDJSON = (data) => {
return JSON.stringify(data) + "\n";
};
let uploadDataToS3 = () => {
let env = process.env;
let s3 = null;
let pass = new stream.PassThrough();
if (env === 'local') {
s3 = new s3Client({
accessKeyId: 'minioadmin' ,
secretAccessKey: 'minioadmin' ,
endpoint: 'http://host.docker.internal:9000' ,
s3ForcePathStyle: true, // needed with minio?
signatureVersion: 'v4'
});
} else {
s3 = new s3Client();
}
//using multipart upload to speed up the process
let params = {Bucket: '<your-bucket-name>', Key: '<file-name>', Body: data};
let opts = {queueSize: 2, partSize: 1024 * 1024 * 10};
s3.upload(params,opts, function(err, data) {
if (err) {
console.log(`Error uploading file ${file-name}`,err);
} else {
console.log(`Successfully uploaded file: ${file-name}, result: ${JSON.stringify(data)}`);
}
});
return pass;
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment