Skip to content

Instantly share code, notes, and snippets.

@jakawell
Last active May 23, 2022 19:23
Show Gist options
  • Save jakawell/ff882f90d08a810cd9ee340448d2c59e to your computer and use it in GitHub Desktop.
Save jakawell/ff882f90d08a810cd9ee340448d2c59e to your computer and use it in GitHub Desktop.
/* eslint-disable no-console */
import { PassThrough, Transform } from 'stream';
import { pipeline } from 'stream/promises';
type ChunkyBoi = { thingy: number }
class OptimusPrime extends Transform {
constructor() {
super({ objectMode: true });
}
_transform(
chunk: ChunkyBoi,
_encoding: string,
callback: (err?: Error, chunk?: ChunkyBoi) => void,
): void {
if (chunk.thingy < 0) {
callback(null, null);
} else {
callback(null, chunk);
}
}
}
async function pipingTest(usePipeline: boolean, chunks: Array<ChunkyBoi>): Promise<void> {
// source stream
const stream = new PassThrough({ objectMode: true });
chunks.forEach(chunk => stream.write(chunk));
stream.end();
// transformer that filters some results
const transform = new OptimusPrime();
// output stream
const resultStream = new PassThrough({ objectMode: true });
if (usePipeline) {
resultStream.on('error', (err) => {
console.log('Got error:', err);
});
resultStream.on('data', () => {
console.log('Got data...');
});
resultStream.on('end', () => {
console.log('Ended.');
});
console.log('Piping setup completed.');
await pipeline(
stream,
transform,
resultStream,
);
console.log('Pipeline completed.');
} else {
stream.pipe(transform).pipe(resultStream);
console.log('Piping setup completed.');
await new Promise<void>((resolve, reject) => {
resultStream.on('error', (err) => {
console.log('Got error:', err);
reject(err);
});
resultStream.on('data', () => {
console.log('Got data...');
});
resultStream.on('end', () => {
console.log('Ended.');
resolve();
});
});
console.log('Piping completed.');
}
}
// eslint-disable-next-line @typescript-eslint/no-floating-promises
(async () => {
const chunksWithOutput: Array<ChunkyBoi> = [
{ thingy: 1 },
{ thingy: 0 },
{ thingy: -1 },
];
const chunksWithoutOutput: Array<ChunkyBoi> = [
{ thingy: -1 },
];
console.log('Pipeline, with output chunks:');
await pipingTest(true, chunksWithOutput);
console.log('');
console.log('Basic piping, with output chunks:');
await pipingTest(false, chunksWithOutput);
console.log('');
console.log('Pipeline, with NO output chunks:');
await pipingTest(true, chunksWithoutOutput);
console.log('');
console.log('Basic piping, with NO output chunks:');
await pipingTest(false, chunksWithoutOutput);
console.log('');
console.log('Finished test run.');
})();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment