Skip to content

Instantly share code, notes, and snippets.

@mshakhomirov
Created November 28, 2021 15:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mshakhomirov/fc2ad8b2b0904e942cc8c745f131485b to your computer and use it in GitHub Desktop.
Save mshakhomirov/fc2ad8b2b0904e942cc8c745f131485b to your computer and use it in GitHub Desktop.
...
const processEvent = async(event, tables, bucket, key) => {
const now = moment();
const datePrefix = now.format('YYYY/MM/DD/HH/');
const minuteKey = now.format('mm').toString();
const connection = await mysql.createConnection({
host: config.get('roDb.host'),
user: config.get('roDb.name'),
password: config.get('roDb.password'),
timezone: '+00:00',
});
for (const table of tables) {
const veryBiqSqlResult = table.sql;
// Use this to find the length of the Stream or no of objects
// before you extract add querySize to your ./config.json file, i.e. "querySize": "select count(*) cnt, NOW() ts from schema.some_table_2;"
const getSize = `${table.querySize}`;
// const [rows] = await connection.execute(getSize, [1]);
try {
const [rows] = await connection.execute(getSize);
const size = rows[0].cnt;
const ts = rows[0].ts;
console.log(`Query size : ${rows[0].cnt}, db NOW() : ${rows[0].ts}`);
if (rows[0].cnt !== 0) {
// construct file name to save into
const s3key = `${key + table.name}/${datePrefix}${table.name}_${minuteKey}`;
const result = await queryDbAndSave(connection, veryBiqSqlResult, size, ts, s3key, bucket, table.output, table.dryRun);
} else if (rows[0].cnt === 0) {
console.log(`Table ${table.name} : has no changes/results`);
}
} catch (e) {
pr(e);
console.log(`ERROR: ${e.code} ${e.sqlMessage}`);
}
}
return pr({ 'Finished processing tables': `${tables.length}` });
}
;
const queryDbAndSave = async(connection, sql, totalRecords, ts, s3key, bucket, output = 'local', dryRun = false) => {
return new Promise((resolve, reject) => {
let recordsProcessed = 0;
let batchNumber = 1;
let batchRecords = [];
// const s1 = connection.connection.query(sql, [ts]);
const s1 = connection.connection.query(sql);
if (output === 'local') {
const outputStream = fs.createWriteStream('output.csv', { encoding: 'utf8' });
s1.stream({ highWaterMark: BATCH_SIZE }) // stream() is just to wrap into pipeable object, and not to enable 'result' events, they are emitted anyway.
.pipe(csvstringify({ header: true }))
.pipe(outputStream)
.on('finish', () => { resolve('saved data locally'); });
} else if (output === 's3') {
s1.on('result', (row) => {
++recordsProcessed;
batchRecords.push(row);
if (recordsProcessed === (BATCH_SIZE * batchNumber) || recordsProcessed === totalRecords) {
if (!dryRun) {
connection.pause();
pr(` batch ${batchNumber}, pushing to s3, ${batchRecords.length}, totalRecordsProcessed = ${recordsProcessed}`);
// batch process here. saves in batch mode, split file into smaller files.
const params = { Bucket: bucket, Key: s3key + batchNumber, Body: JSON.stringify(batchRecords) };
// eslint-disable-next-line promise/catch-or-return
s3.upload(params).promise()
.then(data => { connection.resume(); });
pr(`saved ${batchNumber} to aws s3 cp s3://${bucket}/${s3key}${batchNumber} ./tmp/${s3key}${batchNumber}.csv`);
}
batchRecords = [];
++batchNumber;
}
});
s1.on('end', () => { resolve(recordsProcessed); });
} else if (output === 's3Stream') { // as one file, node.js streaming, save as new line delimited JSON.
const uploadStream = ({ Bucket, Key }) => {
const pass = new stream.PassThrough();
return {
writeStream: pass,
promise: s3.upload({ Bucket, Key, Body: pass }).promise(),
};
};
const { writeStream, promise } = uploadStream({ Bucket: bucket, Key: s3key });
pr(`saving to aws s3 cp s3://${bucket}/${s3key} ./tmp/${s3key}`);
s1.stream({ highWaterMark: BATCH_SIZE }) // stream() is just to wrap into pipeable object, and not to enable 'result' events, they are emitted anyway.
// eslint-disable-next-line func-names
.pipe(through2.obj(function(row, enc, next) {
this.push(`${JSON.stringify(row)}\n`);
next();
},
))
.pipe(writeStream)
.on('close', () => {
console.log('upload finished');
});
promise.then(() => {
console.log('upload completed successfully');
resolve(`saved to aws s3 cp s3://${bucket}/${s3key} ./tmp/${s3key}`);
}).catch((err) => {
console.log('upload failed.', err.message);
});
}
s1.on('error', (error) => {
connection.destroy();
reject(error);
});
});
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment