Skip to content

Instantly share code, notes, and snippets.

@NeoyeElf
Last active July 10, 2018 09:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save NeoyeElf/76b3fbdbc1d5f14f654a7de2254a4c02 to your computer and use it in GitHub Desktop.
Save NeoyeElf/76b3fbdbc1d5f14f654a7de2254a4c02 to your computer and use it in GitHub Desktop.
将atext和doc_histories中大于threshold的content上传至oss
const oss = require('./index')
const mysql = require('mysql')
const fs = require('fs')
const { v4: uuid } = require('uuid')
const EventEmitter = require('events')
const ATEXT_TRACE_FILE = 'atext_trace_id.log'
const DOC_HISTORIES_TRACE_FILE = 'docHistories_trace_id.log'
const rollbackWriterStream = fs.createWriteStream('rollback.sql')
const emitter = new EventEmitter()
const HOST = 'localhost'
const USRE = 'root'
const PASSWORD = '123456'
const DATABASE = 'cow_dev'
const queryAtextConnection = mysql.createConnection({
host: HOST,
user: USRE,
password: PASSWORD,
database: DATABASE
});
const queryDocHistoriesConnection = mysql.createConnection({
host: HOST,
user: USRE,
password: PASSWORD,
database: DATABASE
});
var pool = mysql.createPool({
connectionLimit : 5,
host : HOST,
user : USRE,
password : PASSWORD,
database : DATABASE
});
const ATEXT_BUCKET = 'dev-a-text'
const DOC_HISTORIES_BUCKET = 'dev-doc-histories'
const LIMIT = 1000
const FILE_SIZE_THRESHOLD = 10240
function getLastId(filePath) {
return new Promise((resolve, reject) => {
fs.open(filePath, 'a+', function(err, fd) {
if (err) {
reject(err)
}
let data = fs.readFileSync(filePath)
data = data.toString()
if (data !== '') {
resolve(Number(data))
}
resolve(0)
});
})
}
function traceLastId(filePath, lastId) {
fs.writeFileSync(filePath, lastId)
}
async function migration(limit) {
try {
const [atextLastId, docHistoriesLastId] = await Promise.all([
getLastId(ATEXT_TRACE_FILE),
getLastId(DOC_HISTORIES_TRACE_FILE)
])
docHistoriesMigration(docHistoriesLastId, limit)
atextMigration(atextLastId, limit)
rollbackWriterStream.on('error', (err) => {
console.error('write stream error', err)
})
let done = 2
emitter.on('migration_done', () => {
if (--done === 0) {
console.log('all done!')
}
})
} catch (err) {
console.error(err)
}
}
function docHistoriesMigration(lastId, limit) {
let index = 0
let _lastId = lastId
const query = queryDocHistoriesConnection.query(`SELECT content, file_guid as guid, id FROM doc_histories WHERE id > ${lastId}
AND history_type = 2 limit ${limit}`)
query
.on('error', function(err) {
console.error('mysql query error', err)
})
.on('result', function(row) {
index ++
_lastId = row.id
traceLastId(DOC_HISTORIES_TRACE_FILE, _lastId)
// Pausing the connnection is useful if your processing involves I/O
queryDocHistoriesConnection.pause();
rowProcess(row, DOC_HISTORIES_BUCKET, updateDocHistoriesContent, 'doc_histories')
.then(uploadToOSS => {
if (uploadToOSS) {
rollbackWriterStream.write(`UPDATE doc_histories SET content = '${row.content}' WHERE id = ${row.id};\n`, 'UTF8')
}
queryDocHistoriesConnection.resume()
})
.catch((err) => console.error(`[doc_histories] id: ${row.id} rowProcess error`, err));
})
.on('end', function() {
// all rows have been received
if (index > 0) {
docHistoriesMigration(_lastId, limit)
} else {
console.log('docHistories migration done!')
emitter.emit('migration_done')
}
});
}
function atextMigration(lastId, limit) {
let index = 0
let _lastId = lastId
const query = queryAtextConnection.query(`SELECT id, content, file_id as fileId, user_id as userId FROM atext WHERE id > ${lastId}
limit ${limit}`)
query
.on('error', function(err) {
console.error('mysql query error', err)
})
.on('result', function(row) {
index ++
_lastId = row.id
traceLastId(ATEXT_TRACE_FILE, _lastId)
// Pausing the connnection is useful if your processing involves I/O
queryAtextConnection.pause();
rowProcess(row, ATEXT_BUCKET, updateATextContent, 'atext')
.then(uploadToOSS => {
if (uploadToOSS) {
rollbackWriterStream.write(`UPDATE atext SET content = '${row.content}' WHERE id = ${row.id};\n`, 'UTF8')
}
queryAtextConnection.resume()
})
.catch((err) => console.error(`[atext] id: ${row.id} rowProcess error`, err));
})
.on('end', function() {
// all rows have been received
if (index > 0) {
atextMigration(_lastId, limit)
} else {
console.log('atext migration done!')
emitter.emit('migration_done')
}
});
}
function rowProcess(row, bucket, updateContentFn, table) {
const content = row.content
const id = row.id
return new Promise((resolve, reject) => {
if (!String(content).startsWith('oss:') && Buffer.byteLength(content, 'utf8') > FILE_SIZE_THRESHOLD) {
const ossKey = row.guid ? `${row.guid}-${uuid()}` : `${uuid()}-${row.fileId}-${row.fileId}`
return uploadToOSS(bucket, ossKey, content)
.then(() => updateContentFn(id, ossKey))
.then(() => {
console.log(`[${table}] id: ${id} has been upload to oss bucket: ${bucket}`)
resolve(true)
})
.catch(reject)
} else {
resolve(false)
}
})
}
function updateATextContent(id, ossKey) {
return new Promise((resolve, reject) => {
pool.query('UPDATE atext SET content = ? WHERE id = ?', [`oss:${ossKey}`, id], function(err) {
if (err) reject(err)
resolve()
})
})
}
function updateDocHistoriesContent(id, ossKey) {
return new Promise((resolve, reject) => {
pool.query('UPDATE doc_histories SET content = ? WHERE id = ?', [`oss:${ossKey}`, id], function(err) {
if (err) reject(err)
resolve()
})
})
}
function uploadToOSS(bucket, ossKey, content) {
return new Promise((resolve, reject) => {
oss.putObject({
Bucket: bucket,
Key: ossKey,
Body: content
}, function (err, data) {
if (err) {
reject(err)
}
resolve(data)
});
})
}
migration(LIMIT).then().catch(console.error)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment