Skip to content

Instantly share code, notes, and snippets.

@evancauwenberg
Forked from doug-martin/batch_operation.js
Created January 12, 2021 16:31
Show Gist options
  • Save evancauwenberg/93152014ee87b851dcc389f9a6c63d87 to your computer and use it in GitHub Desktop.
Save evancauwenberg/93152014ee87b851dcc389f9a6c63d87 to your computer and use it in GitHub Desktop.
const path = require('path');
const fs = require('fs');
const { Transform } = require('stream');
const csv = require('fast-csv');
class PersistStream extends Transform {
constructor(args) {
super({ objectMode: true, ...(args || {}) });
this.batchSize = 100;
this.batch = [];
if (args && args.batchSize) {
this.batchSize = args.batchSize;
}
}
_transform(record, encoding, callback) {
this.batch.push(record);
if (this.shouldSaveBatch) {
// we have hit our batch size to process the records as a batch
this.processRecords()
// we successfully processed the records so callback
.then(() => callback())
// An error occurred!
.catch(err => err(err));
return;
}
// we shouldnt persist so ignore
callback();
}
_flush(callback) {
if (this.batch.length) {
// handle any leftover records that were not persisted because the batch was too small
this.processRecords()
// we successfully processed the records so callback
.then(() => callback())
// An error occurred!
.catch(err => err(err));
return;
}
// no records to persist so just call callback
callback();
}
pushRecords(records) {
// emit each record for down stream processing
records.forEach(r => this.push(r));
}
get shouldSaveBatch() {
// this could be any check, for this example is is record cont
return this.batch.length >= this.batchSize;
}
async processRecords() {
// save the records
const records = await this.saveBatch();
// besure to emit them
this.pushRecords(records);
return records;
}
async saveBatch() {
const records = this.batch;
this.batch = [];
console.log(`Saving batch [noOfRecords=${records.length}]`);
// This is where you should save/update/delete the records
return new Promise(res => {
setTimeout(() => res(records), 100);
});
}
}
const processCsv = ({ file, batchSize }) =>
new Promise((res, rej) => {
let recordCount = 0;
fs.createReadStream(file)
// catch file read errors
.on('error', err => rej(err))
.pipe(csv.parse({ headers: true }))
// catch an parsing errors
.on('error', err => rej(err))
// pipe into our processing stream
.pipe(new PersistStream({ batchSize }))
.on('error', err => rej(err))
.on('data', () => {
recordCount += 1;
})
.on('end', () => res({ event: 'end', recordCount }));
});
const file = path.resolve(__dirname, `batch_write.csv`);
// end early after 30000 records
processCsv({ file, batchSize: 5 })
.then(({ event, recordCount }) => {
console.log(`Done Processing [event=${event}] [recordCount=${recordCount}]`);
})
.catch(e => {
console.error(e.stack);
});
id first_name last_name address
1 Bob Yukon 1111 State St. Yukon AK
2 Sally Yukon 1111 State St. Yukon AK
3 Bobby Yukon 1111 State St. Yukon AK
4 Jane Yukon 1111 State St. Yukon AK
5 Dick Yukon 1111 State St. Yukon AK
6 John Doe 1112 State St. Yukon AK
7 Jane Doe 1113 State St. Yukon AK
8 Billy Doe 1112 State St. Yukon AK
9 Edith Doe 1112 State St. Yukon AK
10 Bob Yukon 1111 State St. Yukon AK
11 Sally Yukon 1111 State St. Yukon AK
12 Bobby Yukon 1111 State St. Yukon AK
13 Jane Yukon 1111 State St. Yukon AK
14 Dick Yukon 1111 State St. Yukon AK
15 John Doe 1112 State St. Yukon AK
16 Jane Doe 1113 State St. Yukon AK
17 Billy Doe 1112 State St. Yukon AK
18 Edith Doe 1112 State St. Yukon AK
19 Bob Yukon 1111 State St. Yukon AK
20 Sally Yukon 1111 State St. Yukon AK
21 Bobby Yukon 1111 State St. Yukon AK
22 Jane Yukon 1111 State St. Yukon AK
23 Dick Yukon 1111 State St. Yukon AK
24 John Doe 1112 State St. Yukon AK
25 Jane Doe 1113 State St. Yukon AK
26 Billy Doe 1112 State St. Yukon AK
27 Edith Doe 1112 State St. Yukon AK
28 Bob Yukon 1111 State St. Yukon AK
29 Sally Yukon 1111 State St. Yukon AK
30 Bobby Yukon 1111 State St. Yukon AK
31 Jane Yukon 1111 State St. Yukon AK
32 Dick Yukon 1111 State St. Yukon AK
33 John Doe 1112 State St. Yukon AK
34 Jane Doe 1113 State St. Yukon AK
35 Billy Doe 1112 State St. Yukon AK
36 Edith Doe 1112 State St. Yukon AK
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment