Last active
May 23, 2022 19:23
-
-
Save jakawell/ff882f90d08a810cd9ee340448d2c59e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* eslint-disable no-console */ | |
import { PassThrough, Transform } from 'stream'; | |
import { pipeline } from 'stream/promises'; | |
type ChunkyBoi = { thingy: number } | |
class OptimusPrime extends Transform { | |
constructor() { | |
super({ objectMode: true }); | |
} | |
_transform( | |
chunk: ChunkyBoi, | |
_encoding: string, | |
callback: (err?: Error, chunk?: ChunkyBoi) => void, | |
): void { | |
if (chunk.thingy < 0) { | |
callback(null, null); | |
} else { | |
callback(null, chunk); | |
} | |
} | |
} | |
async function pipingTest(usePipeline: boolean, chunks: Array<ChunkyBoi>): Promise<void> { | |
// source stream | |
const stream = new PassThrough({ objectMode: true }); | |
chunks.forEach(chunk => stream.write(chunk)); | |
stream.end(); | |
// transformer that filters some results | |
const transform = new OptimusPrime(); | |
// output stream | |
const resultStream = new PassThrough({ objectMode: true }); | |
if (usePipeline) { | |
resultStream.on('error', (err) => { | |
console.log('Got error:', err); | |
}); | |
resultStream.on('data', () => { | |
console.log('Got data...'); | |
}); | |
resultStream.on('end', () => { | |
console.log('Ended.'); | |
}); | |
console.log('Piping setup completed.'); | |
await pipeline( | |
stream, | |
transform, | |
resultStream, | |
); | |
console.log('Pipeline completed.'); | |
} else { | |
stream.pipe(transform).pipe(resultStream); | |
console.log('Piping setup completed.'); | |
await new Promise<void>((resolve, reject) => { | |
resultStream.on('error', (err) => { | |
console.log('Got error:', err); | |
reject(err); | |
}); | |
resultStream.on('data', () => { | |
console.log('Got data...'); | |
}); | |
resultStream.on('end', () => { | |
console.log('Ended.'); | |
resolve(); | |
}); | |
}); | |
console.log('Piping completed.'); | |
} | |
} | |
// eslint-disable-next-line @typescript-eslint/no-floating-promises | |
(async () => { | |
const chunksWithOutput: Array<ChunkyBoi> = [ | |
{ thingy: 1 }, | |
{ thingy: 0 }, | |
{ thingy: -1 }, | |
]; | |
const chunksWithoutOutput: Array<ChunkyBoi> = [ | |
{ thingy: -1 }, | |
]; | |
console.log('Pipeline, with output chunks:'); | |
await pipingTest(true, chunksWithOutput); | |
console.log(''); | |
console.log('Basic piping, with output chunks:'); | |
await pipingTest(false, chunksWithOutput); | |
console.log(''); | |
console.log('Pipeline, with NO output chunks:'); | |
await pipingTest(true, chunksWithoutOutput); | |
console.log(''); | |
console.log('Basic piping, with NO output chunks:'); | |
await pipingTest(false, chunksWithoutOutput); | |
console.log(''); | |
console.log('Finished test run.'); | |
})(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment