Last active
December 7, 2018 13:20
Star
You must be signed in to star a gist
Goodbye Transform-Streams, long live ES9 Async Generators
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const asyncGeneratorsFactory = function (cb) { | |
return async function* (asyncIterable) { | |
for await (const chunk of asyncIterable) { | |
const res = await cb(chunk); | |
if (typeof res != 'undefined') yield res; | |
} | |
} | |
} | |
exports.asyncGeneratorsFactory = asyncGeneratorsFactory; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const delay = require('./delay'); | |
// simply yield a number each second | |
function asyncIterablesFactory() { | |
return { | |
async *[Symbol.asyncIterator]() { | |
let n = 0; | |
while (1) { | |
await delay(1); | |
yield n++; | |
} | |
} | |
} | |
} | |
exports.asyncIterablesFactory = asyncIterablesFactory; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const delay = function (n) { | |
return new Promise(ok => { | |
setTimeout(() => ok(n), n * 1000); | |
}) | |
} | |
module.exports = delay; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// custom modules imports | |
const { asyncGeneratorsFactory } = require('./asyncGeneratorsFactory'); | |
const { asyncIterablesFactory } = require('./asyncIterablesFactory'); | |
const delay = require('./delay'); | |
const pipe = require('./pipe'); | |
// create an async iterable that produces numbers | |
const asyncIterable = asyncIterablesFactory(); | |
// create some functions to handle each async number and then... | |
const onlyOdd = function (n) { | |
if (n % 2) return n; | |
} | |
const pause = async function (n) { | |
await delay(2); | |
return n; | |
} | |
const toString = function (n) { | |
return String(n); | |
} | |
const newLine = function (str) { | |
return str + require('os').EOL; | |
} | |
// ...create the async generator version of them | |
const onlyOddAsyncGen = asyncGeneratorsFactory(onlyOdd); | |
const pauseAsyncGen = asyncGeneratorsFactory(pause); | |
const toStringGen = asyncGeneratorsFactory(toString); | |
const newLineGen = asyncGeneratorsFactory(newLine); | |
// start the iteration using my pipe function | |
(async function IIAFE() { | |
// it returns the outputSream | |
await pipe(asyncIterable, process.stdout, onlyOddAsyncGen, pauseAsyncGen, toStringGen, newLineGen); | |
})(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const pipe = function () { | |
const functionalPipe = (fns) => x => fns.reduce((v, f) => f(v), x); | |
return async function pipe(inputStream, outputStream, ...modifiers) { | |
const asyncIterable = modifiers.length ? functionalPipe(modifiers)(inputStream) : inputStream; | |
for await (const chunk of asyncIterable) { | |
outputStream.write(chunk); | |
} | |
return outputStream; | |
} | |
}(); | |
module.exports = pipe; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment