Skip to content

Instantly share code, notes, and snippets.

@bsingr
Created November 20, 2019 11:37
Show Gist options
  • Save bsingr/32c7792df163206416bccbd49fb5254e to your computer and use it in GitHub Desktop.
Save bsingr/32c7792df163206416bccbd49fb5254e to your computer and use it in GitHub Desktop.
streams experiment
const { Readable, Transform, pipeline } = require('stream')
const createCounterReader = () => {
let count = 0;
return new Readable({
objectMode: true,
read() {
count += 1;
console.log('read', count)
this.push({count});
},
});
};
const sleep = (delay) => {
return new Promise(resolve => {
setTimeout(() => {
resolve()
}, delay)
})
}
async function main () {
const readable = createCounterReader();
let counter = 0;
const transform = new Transform({
objectMode: true,
highWaterMark: 10,
transform: async (chunk, encoding, callback) => {
const { count } = chunk
console.log('transform:', count);
await sleep(1000)
console.log('transformEnd:', count);
++counter
callback(null, `${counter}. ${JSON.stringify(chunk)} \n`)
}
})
// pipeline stream executes sequential (downstream ↓)
readable.pipe(transform)
}
main().catch((error) => console.error(error.toString()))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment