Last active
July 10, 2018 09:45
-
-
Save NeoyeElf/76b3fbdbc1d5f14f654a7de2254a4c02 to your computer and use it in GitHub Desktop.
将atext和doc_histories中大于threshold的content上传至oss
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const oss = require('./index') | |
const mysql = require('mysql') | |
const fs = require('fs') | |
const { v4: uuid } = require('uuid') | |
const EventEmitter = require('events') | |
const ATEXT_TRACE_FILE = 'atext_trace_id.log' | |
const DOC_HISTORIES_TRACE_FILE = 'docHistories_trace_id.log' | |
const rollbackWriterStream = fs.createWriteStream('rollback.sql') | |
const emitter = new EventEmitter() | |
const HOST = 'localhost' | |
const USRE = 'root' | |
const PASSWORD = '123456' | |
const DATABASE = 'cow_dev' | |
const queryAtextConnection = mysql.createConnection({ | |
host: HOST, | |
user: USRE, | |
password: PASSWORD, | |
database: DATABASE | |
}); | |
const queryDocHistoriesConnection = mysql.createConnection({ | |
host: HOST, | |
user: USRE, | |
password: PASSWORD, | |
database: DATABASE | |
}); | |
var pool = mysql.createPool({ | |
connectionLimit : 5, | |
host : HOST, | |
user : USRE, | |
password : PASSWORD, | |
database : DATABASE | |
}); | |
const ATEXT_BUCKET = 'dev-a-text' | |
const DOC_HISTORIES_BUCKET = 'dev-doc-histories' | |
const LIMIT = 1000 | |
const FILE_SIZE_THRESHOLD = 10240 | |
function getLastId(filePath) { | |
return new Promise((resolve, reject) => { | |
fs.open(filePath, 'a+', function(err, fd) { | |
if (err) { | |
reject(err) | |
} | |
let data = fs.readFileSync(filePath) | |
data = data.toString() | |
if (data !== '') { | |
resolve(Number(data)) | |
} | |
resolve(0) | |
}); | |
}) | |
} | |
function traceLastId(filePath, lastId) { | |
fs.writeFileSync(filePath, lastId) | |
} | |
async function migration(limit) { | |
try { | |
const [atextLastId, docHistoriesLastId] = await Promise.all([ | |
getLastId(ATEXT_TRACE_FILE), | |
getLastId(DOC_HISTORIES_TRACE_FILE) | |
]) | |
docHistoriesMigration(docHistoriesLastId, limit) | |
atextMigration(atextLastId, limit) | |
rollbackWriterStream.on('error', (err) => { | |
console.error('write stream error', err) | |
}) | |
let done = 2 | |
emitter.on('migration_done', () => { | |
if (--done === 0) { | |
console.log('all done!') | |
} | |
}) | |
} catch (err) { | |
console.error(err) | |
} | |
} | |
function docHistoriesMigration(lastId, limit) { | |
let index = 0 | |
let _lastId = lastId | |
const query = queryDocHistoriesConnection.query(`SELECT content, file_guid as guid, id FROM doc_histories WHERE id > ${lastId} | |
AND history_type = 2 limit ${limit}`) | |
query | |
.on('error', function(err) { | |
console.error('mysql query error', err) | |
}) | |
.on('result', function(row) { | |
index ++ | |
_lastId = row.id | |
traceLastId(DOC_HISTORIES_TRACE_FILE, _lastId) | |
// Pausing the connnection is useful if your processing involves I/O | |
queryDocHistoriesConnection.pause(); | |
rowProcess(row, DOC_HISTORIES_BUCKET, updateDocHistoriesContent, 'doc_histories') | |
.then(uploadToOSS => { | |
if (uploadToOSS) { | |
rollbackWriterStream.write(`UPDATE doc_histories SET content = '${row.content}' WHERE id = ${row.id};\n`, 'UTF8') | |
} | |
queryDocHistoriesConnection.resume() | |
}) | |
.catch((err) => console.error(`[doc_histories] id: ${row.id} rowProcess error`, err)); | |
}) | |
.on('end', function() { | |
// all rows have been received | |
if (index > 0) { | |
docHistoriesMigration(_lastId, limit) | |
} else { | |
console.log('docHistories migration done!') | |
emitter.emit('migration_done') | |
} | |
}); | |
} | |
function atextMigration(lastId, limit) { | |
let index = 0 | |
let _lastId = lastId | |
const query = queryAtextConnection.query(`SELECT id, content, file_id as fileId, user_id as userId FROM atext WHERE id > ${lastId} | |
limit ${limit}`) | |
query | |
.on('error', function(err) { | |
console.error('mysql query error', err) | |
}) | |
.on('result', function(row) { | |
index ++ | |
_lastId = row.id | |
traceLastId(ATEXT_TRACE_FILE, _lastId) | |
// Pausing the connnection is useful if your processing involves I/O | |
queryAtextConnection.pause(); | |
rowProcess(row, ATEXT_BUCKET, updateATextContent, 'atext') | |
.then(uploadToOSS => { | |
if (uploadToOSS) { | |
rollbackWriterStream.write(`UPDATE atext SET content = '${row.content}' WHERE id = ${row.id};\n`, 'UTF8') | |
} | |
queryAtextConnection.resume() | |
}) | |
.catch((err) => console.error(`[atext] id: ${row.id} rowProcess error`, err)); | |
}) | |
.on('end', function() { | |
// all rows have been received | |
if (index > 0) { | |
atextMigration(_lastId, limit) | |
} else { | |
console.log('atext migration done!') | |
emitter.emit('migration_done') | |
} | |
}); | |
} | |
function rowProcess(row, bucket, updateContentFn, table) { | |
const content = row.content | |
const id = row.id | |
return new Promise((resolve, reject) => { | |
if (!String(content).startsWith('oss:') && Buffer.byteLength(content, 'utf8') > FILE_SIZE_THRESHOLD) { | |
const ossKey = row.guid ? `${row.guid}-${uuid()}` : `${uuid()}-${row.fileId}-${row.fileId}` | |
return uploadToOSS(bucket, ossKey, content) | |
.then(() => updateContentFn(id, ossKey)) | |
.then(() => { | |
console.log(`[${table}] id: ${id} has been upload to oss bucket: ${bucket}`) | |
resolve(true) | |
}) | |
.catch(reject) | |
} else { | |
resolve(false) | |
} | |
}) | |
} | |
function updateATextContent(id, ossKey) { | |
return new Promise((resolve, reject) => { | |
pool.query('UPDATE atext SET content = ? WHERE id = ?', [`oss:${ossKey}`, id], function(err) { | |
if (err) reject(err) | |
resolve() | |
}) | |
}) | |
} | |
function updateDocHistoriesContent(id, ossKey) { | |
return new Promise((resolve, reject) => { | |
pool.query('UPDATE doc_histories SET content = ? WHERE id = ?', [`oss:${ossKey}`, id], function(err) { | |
if (err) reject(err) | |
resolve() | |
}) | |
}) | |
} | |
function uploadToOSS(bucket, ossKey, content) { | |
return new Promise((resolve, reject) => { | |
oss.putObject({ | |
Bucket: bucket, | |
Key: ossKey, | |
Body: content | |
}, function (err, data) { | |
if (err) { | |
reject(err) | |
} | |
resolve(data) | |
}); | |
}) | |
} | |
migration(LIMIT).then().catch(console.error) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment