Instantly share code, notes, and snippets.

Embed
What would you like to do?
逐条对mysql数据进行处理
var stream = require('stream');
var mysql = require('mysql');
var connection = mysql.createConnection({
host: 'XX',
user: 'XX',
password: 'XX',
database: 'XX'
});
var errList = [];
connection.connect();
connection.query('select id,fans_id,fans_nickname,buyer_id,fans_type,fans_info from yztrade')
.stream()
.pipe(stream.Transform({
objectMode: true,
transform: function(row, encoding, callback) {
/**
if (!!row.fans_id) {
callback()
} else {
/**/
try {
var fans = JSON.parse(row.fans_info);
console.log(fans.fans_id);
var query = `UPDATE yztrade SET fans_id=${fans.fans_id},fans_nickname='${fans.fans_nickname}',buyer_id=${fans.buyer_id},fans_type=${fans.fans_type} where id=${row.id}`;
connection.query(query, function(err) {
if (err) {
errList.push(row.id);
}
console.log(`完成id: ${row.id}`)
callback();
});
} catch (err) {
errList.push(row.id);
callback();
}
// }
}
}))
.on('finish', function() {
connection.end();
if (errList.length) {
console.log('未正常处理的id为:')
console.log(errList)
}
console.log('end')
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment