Skip to content

Instantly share code, notes, and snippets.

@sstur
Last active February 13, 2018 03:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sstur/26df66e2a7e201facad2368aeafa8fea to your computer and use it in GitHub Desktop.
Save sstur/26df66e2a7e201facad2368aeafa8fea to your computer and use it in GitHub Desktop.
[RFC] Node Streams to Promise-based Streams
// @flow
import fs from 'fs';
import {join} from 'path';
async function main() {
let filePath = join(__dirname, '../assets/image.jpg');
// The first one uses Promise-based readable stream.
await getFileSizeOne(filePath);
// The second one uses "for await" async iterator approach.
await getFileSizeTwo(filePath);
}
main();
async function getFileSizeOne(filePath) {
let reader = getReader(fs.createReadStream(filePath));
let total = 0;
// eslint-disable-next-line no-constant-condition
while (true) {
let {done, value} = await reader.read();
if (done) {
break;
}
let length = value ? value.length : 0;
console.log(`Received ${length} bytes of data.`);
total += length;
}
console.log(`Done. Total bytes: ${total}`);
return total;
}
async function getFileSizeTwo(filePath) {
let reader = getReader(fs.createReadStream(filePath));
let asyncReader = getAsyncIterable(() => reader.read());
let total = 0;
for await (let chunk of asyncReader) {
console.log(`Received ${chunk.length} bytes of data.`);
total += chunk.length;
}
console.log(`Done. Total bytes: ${total}`);
return total;
}
function getReader(readStream) {
let isFinished = false;
readStream.on('end', () => {
isFinished = true;
});
function tryRead(callback) {
if (isFinished) {
callback({done: true, value: undefined});
return;
}
let chunk = readStream.read();
if (chunk === null) {
// The stream will either end or become readable.
raceOnce(readStream, ['readable', 'end'], () => tryRead(callback));
} else {
callback({done: false, value: chunk});
}
}
return {
read: () => new Promise(tryRead),
};
}
function raceOnce(
emitter: *,
eventNames: Array<string>,
callback: (mixed) => void,
) {
let eventHandler = (result) => {
for (let eventName of eventNames) {
emitter.removeListener(eventName, eventHandler);
}
callback(result);
};
for (let eventName of eventNames) {
emitter.addListener(eventName, eventHandler);
}
}
function getAsyncIterable(next): $AsyncIterable<*, *, *> {
let asyncIterator = {
// $FlowIssue
[Symbol.asyncIterator]: () => asyncIterator,
next: next,
};
// $FlowIssue
return asyncIterator;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment