Skip to content

Instantly share code, notes, and snippets.

@csotiriou
Created May 21, 2020 06:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save csotiriou/a844cc4a18445b78ba8268b29dfccdc3 to your computer and use it in GitHub Desktop.
Save csotiriou/a844cc4a18445b78ba8268b29dfccdc3 to your computer and use it in GitHub Desktop.
Reading a file with backpressure using NodeJS
type PromiseThunk = (lineBatch: string[]) => Promise<any>;
/**
*
* @param filePath the file to open
* @param promiseThunk A function that returns a promise. This function will be called with the current batch lines as the arcument.
* @param bufferSize how many lines should we process at a time
*/
export async function streamFile2(filePath: string, promiseThunk: PromiseThunk, bufferSize: number = 100) {
return new Promise<unknown>((resolve, reject) => {
//create the NodeJS read stream
const stream = fs.createReadStream(filePath, { encoding: 'utf8' });
//how many lines should we process at a time?
let buffer = [];
stream
//ensure parsing line by line
.pipe(split2())
//ensure that the next chunk will be processed by the
//stream only when we want to
.pipe(
through2((chunk, enc, callback) => {
//put the chunk along with the other ones
buffer.push(chunk.toString());
if (buffer.length < bufferSize) {
callback(); //next step, no process
} else {
//call the method that creates a promise, and at the end
//just empty the buffer, and process the next chunk
promiseThunk(buffer).finally(() => {
buffer = [];
callback();
});
}
}),
)
.on('error', error => {
reject(error);
})
.on('finish', () => {
//any remaining data still needs to be sent
//resolve the outer promise only when the final batch has completed processing
if (buffer.length > 0) {
promiseThunk(buffer).finally(() => {
resolve(true);
});
} else {
resolve(true);
}
});
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment