Skip to content

Instantly share code, notes, and snippets.

@gemmadlou
Created August 21, 2019 12:46
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save gemmadlou/334146144405092d9e9f3275cc02a564 to your computer and use it in GitHub Desktop.
Processing large file with backpressure
const dynamoose = require('dynamoose');
dynamoose.AWS.config.update({
region: 'eu-west-1'
});
const schema = new dynamoose.Schema({
SOMEKEY: {
type: String,
hashKey: true,
},
SOMEDATA: String
});
const Model = dynamoose.model('dynamotable', schema);
module.exports = Model;
const csv = require('csvtojson')
const fs = require('fs')
const stream = require('stream')
const model = require('./model')
let total = 36504517
let lineCompleted = 0;
const CHUNKAMOUNT = 100
let data = []
class WriteStream extends stream.Transform {
constructor() {
super()
}
_write(chunk, encoding, callback) {
if (data.length < CHUNKAMOUNT) {
data.push(JSON.parse(chunk.toString('utf8')))
callback()
} else if (data.length === CHUNKAMOUNT) {
model.batchPut(data)
.then(() => {
lineCompleted += CHUNKAMOUNT
console.log(new Date(), 'chunk:', data.length, lineCompleted, '/', total, 'completed:', (lineCompleted / total * 100).toFixed(3), '%')
this.push(chunk.toString('utf8'))
data = []
callback()
})
}
}
_read() {}
}
let writeStream = new WriteStream
process.on('exit', (code) => {
console.log(`About to exit with code: ${code}`);
});
const readStream = fs.createReadStream('./data.csv', { encoding: 'utf8'});
readStream
.pipe(csv())
.pipe(writeStream)
.on('error', (err) => {
console.log(err)
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment