Created
November 28, 2021 15:43
-
-
Save mshakhomirov/fc2ad8b2b0904e942cc8c745f131485b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
... | |
const processEvent = async(event, tables, bucket, key) => { | |
const now = moment(); | |
const datePrefix = now.format('YYYY/MM/DD/HH/'); | |
const minuteKey = now.format('mm').toString(); | |
const connection = await mysql.createConnection({ | |
host: config.get('roDb.host'), | |
user: config.get('roDb.name'), | |
password: config.get('roDb.password'), | |
timezone: '+00:00', | |
}); | |
for (const table of tables) { | |
const veryBiqSqlResult = table.sql; | |
// Use this to find the length of the Stream or no of objects | |
// before you extract add querySize to your ./config.json file, i.e. "querySize": "select count(*) cnt, NOW() ts from schema.some_table_2;" | |
const getSize = `${table.querySize}`; | |
// const [rows] = await connection.execute(getSize, [1]); | |
try { | |
const [rows] = await connection.execute(getSize); | |
const size = rows[0].cnt; | |
const ts = rows[0].ts; | |
console.log(`Query size : ${rows[0].cnt}, db NOW() : ${rows[0].ts}`); | |
if (rows[0].cnt !== 0) { | |
// construct file name to save into | |
const s3key = `${key + table.name}/${datePrefix}${table.name}_${minuteKey}`; | |
const result = await queryDbAndSave(connection, veryBiqSqlResult, size, ts, s3key, bucket, table.output, table.dryRun); | |
} else if (rows[0].cnt === 0) { | |
console.log(`Table ${table.name} : has no changes/results`); | |
} | |
} catch (e) { | |
pr(e); | |
console.log(`ERROR: ${e.code} ${e.sqlMessage}`); | |
} | |
} | |
return pr({ 'Finished processing tables': `${tables.length}` }); | |
} | |
; | |
const queryDbAndSave = async(connection, sql, totalRecords, ts, s3key, bucket, output = 'local', dryRun = false) => { | |
return new Promise((resolve, reject) => { | |
let recordsProcessed = 0; | |
let batchNumber = 1; | |
let batchRecords = []; | |
// const s1 = connection.connection.query(sql, [ts]); | |
const s1 = connection.connection.query(sql); | |
if (output === 'local') { | |
const outputStream = fs.createWriteStream('output.csv', { encoding: 'utf8' }); | |
s1.stream({ highWaterMark: BATCH_SIZE }) // stream() is just to wrap into pipeable object, and not to enable 'result' events, they are emitted anyway. | |
.pipe(csvstringify({ header: true })) | |
.pipe(outputStream) | |
.on('finish', () => { resolve('saved data locally'); }); | |
} else if (output === 's3') { | |
s1.on('result', (row) => { | |
++recordsProcessed; | |
batchRecords.push(row); | |
if (recordsProcessed === (BATCH_SIZE * batchNumber) || recordsProcessed === totalRecords) { | |
if (!dryRun) { | |
connection.pause(); | |
pr(` batch ${batchNumber}, pushing to s3, ${batchRecords.length}, totalRecordsProcessed = ${recordsProcessed}`); | |
// batch process here. saves in batch mode, split file into smaller files. | |
const params = { Bucket: bucket, Key: s3key + batchNumber, Body: JSON.stringify(batchRecords) }; | |
// eslint-disable-next-line promise/catch-or-return | |
s3.upload(params).promise() | |
.then(data => { connection.resume(); }); | |
pr(`saved ${batchNumber} to aws s3 cp s3://${bucket}/${s3key}${batchNumber} ./tmp/${s3key}${batchNumber}.csv`); | |
} | |
batchRecords = []; | |
++batchNumber; | |
} | |
}); | |
s1.on('end', () => { resolve(recordsProcessed); }); | |
} else if (output === 's3Stream') { // as one file, node.js streaming, save as new line delimited JSON. | |
const uploadStream = ({ Bucket, Key }) => { | |
const pass = new stream.PassThrough(); | |
return { | |
writeStream: pass, | |
promise: s3.upload({ Bucket, Key, Body: pass }).promise(), | |
}; | |
}; | |
const { writeStream, promise } = uploadStream({ Bucket: bucket, Key: s3key }); | |
pr(`saving to aws s3 cp s3://${bucket}/${s3key} ./tmp/${s3key}`); | |
s1.stream({ highWaterMark: BATCH_SIZE }) // stream() is just to wrap into pipeable object, and not to enable 'result' events, they are emitted anyway. | |
// eslint-disable-next-line func-names | |
.pipe(through2.obj(function(row, enc, next) { | |
this.push(`${JSON.stringify(row)}\n`); | |
next(); | |
}, | |
)) | |
.pipe(writeStream) | |
.on('close', () => { | |
console.log('upload finished'); | |
}); | |
promise.then(() => { | |
console.log('upload completed successfully'); | |
resolve(`saved to aws s3 cp s3://${bucket}/${s3key} ./tmp/${s3key}`); | |
}).catch((err) => { | |
console.log('upload failed.', err.message); | |
}); | |
} | |
s1.on('error', (error) => { | |
connection.destroy(); | |
reject(error); | |
}); | |
}); | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment