Last active
February 13, 2018 03:23
-
-
Save sstur/26df66e2a7e201facad2368aeafa8fea to your computer and use it in GitHub Desktop.
[RFC] Node Streams to Promise-based Streams
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// @flow | |
import fs from 'fs'; | |
import {join} from 'path'; | |
async function main() { | |
let filePath = join(__dirname, '../assets/image.jpg'); | |
// The first one uses Promise-based readable stream. | |
await getFileSizeOne(filePath); | |
// The second one uses "for await" async iterator approach. | |
await getFileSizeTwo(filePath); | |
} | |
main(); | |
async function getFileSizeOne(filePath) { | |
let reader = getReader(fs.createReadStream(filePath)); | |
let total = 0; | |
// eslint-disable-next-line no-constant-condition | |
while (true) { | |
let {done, value} = await reader.read(); | |
if (done) { | |
break; | |
} | |
let length = value ? value.length : 0; | |
console.log(`Received ${length} bytes of data.`); | |
total += length; | |
} | |
console.log(`Done. Total bytes: ${total}`); | |
return total; | |
} | |
async function getFileSizeTwo(filePath) { | |
let reader = getReader(fs.createReadStream(filePath)); | |
let asyncReader = getAsyncIterable(() => reader.read()); | |
let total = 0; | |
for await (let chunk of asyncReader) { | |
console.log(`Received ${chunk.length} bytes of data.`); | |
total += chunk.length; | |
} | |
console.log(`Done. Total bytes: ${total}`); | |
return total; | |
} | |
function getReader(readStream) { | |
let isFinished = false; | |
readStream.on('end', () => { | |
isFinished = true; | |
}); | |
function tryRead(callback) { | |
if (isFinished) { | |
callback({done: true, value: undefined}); | |
return; | |
} | |
let chunk = readStream.read(); | |
if (chunk === null) { | |
// The stream will either end or become readable. | |
raceOnce(readStream, ['readable', 'end'], () => tryRead(callback)); | |
} else { | |
callback({done: false, value: chunk}); | |
} | |
} | |
return { | |
read: () => new Promise(tryRead), | |
}; | |
} | |
function raceOnce( | |
emitter: *, | |
eventNames: Array<string>, | |
callback: (mixed) => void, | |
) { | |
let eventHandler = (result) => { | |
for (let eventName of eventNames) { | |
emitter.removeListener(eventName, eventHandler); | |
} | |
callback(result); | |
}; | |
for (let eventName of eventNames) { | |
emitter.addListener(eventName, eventHandler); | |
} | |
} | |
function getAsyncIterable(next): $AsyncIterable<*, *, *> { | |
let asyncIterator = { | |
// $FlowIssue | |
[Symbol.asyncIterator]: () => asyncIterator, | |
next: next, | |
}; | |
// $FlowIssue | |
return asyncIterator; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment