Instantly share code, notes, and snippets.

Embed
What would you like to do?
Goodbye Transform-Streams, long live ES9 Async Generators
const asyncGeneratorsFactory = function (cb) {
return async function* (asyncIterable) {
for await (const chunk of asyncIterable) {
const res = await cb(chunk);
if (typeof res != 'undefined') yield res;
}
}
}
exports.asyncGeneratorsFactory = asyncGeneratorsFactory;
const delay = require('./delay');
// simply yield a number each second
function asyncIterablesFactory() {
return {
async *[Symbol.asyncIterator]() {
let n = 0;
while (1) {
await delay(1);
yield n++;
}
}
}
}
exports.asyncIterablesFactory = asyncIterablesFactory;
const delay = function (n) {
return new Promise(ok => {
setTimeout(() => ok(n), n * 1000);
})
}
module.exports = delay;
// custom modules imports
const { asyncGeneratorsFactory } = require('./asyncGeneratorsFactory');
const { asyncIterablesFactory } = require('./asyncIterablesFactory');
const delay = require('./delay');
const pipe = require('./pipe');
// create an async iterable that produces numbers
const asyncIterable = asyncIterablesFactory();
// create some functions to handle each async number and then...
const onlyOdd = function (n) {
if (n % 2) return n;
}
const pause = async function (n) {
await delay(2);
return n;
}
const toString = function (n) {
return String(n);
}
const newLine = function (str) {
return str + require('os').EOL;
}
// ...create the async generator version of them
const onlyOddAsyncGen = asyncGeneratorsFactory(onlyOdd);
const pauseAsyncGen = asyncGeneratorsFactory(pause);
const toStringGen = asyncGeneratorsFactory(toString);
const newLineGen = asyncGeneratorsFactory(newLine);
// start the iteration using my pipe function
(async function IIAFE() {
// it returns the outputSream
await pipe(asyncIterable, process.stdout, onlyOddAsyncGen, pauseAsyncGen, toStringGen, newLineGen);
})();
const pipe = function () {
const functionalPipe = (fns) => x => fns.reduce((v, f) => f(v), x);
return async function pipe(inputStream, outputStream, ...modifiers) {
const asyncIterable = modifiers.length ? functionalPipe(modifiers)(inputStream) : inputStream;
for await (const chunk of asyncIterable) {
outputStream.write(chunk);
}
return outputStream;
}
}();
module.exports = pipe;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment